springboot集成kafka集群,springboot集成kafka0.10.2 消费者代码
一生产者跳羚集成卡夫卡,无非就是生产者和消费者,但首先得实现跳羚集成,流程如下:
1:pom.xml引入依赖包
2:创建启动类
3:配置核心配置文件application.yml1.1 pom.xml在pom.xml中引入春天——卡夫卡,代码如下:
?可扩展标记语言版本=1.0 编码=UTF八号?
项目xmlns= http://maven。阿帕奇。org/POM/4。0 .0
xmlns:xsi= http://。w3。 org/2001/XML架构-实例
xsi:架构位置= http://maven。阿帕奇。org/POM/4。0 .0 http://maven.apache.org/xsd/maven-4.0.0.xsd
型号版本4 .0 .0/型号版本
groupId com.itmentu /groupId
卡夫卡-制作者-01 /artifactId
版本1.0-快照/版本
父母
groupId组织。spring框架。boot/groupId
工件id spring-boot-starter-parent/工件id
版本2.3.8 .发布/版本
/父母
属国
属国
groupId组织。spring框架。boot/groupId
工件id spring-boot-starter-web/工件id
/依赖关系
属国
groupId组织。spring框架。卡夫卡/groupId
artifactId spring-Kafka/artifactId
/依赖关系
属国
groupId com.alibaba /groupId
artifactId fastjson /artifactId
版本1 .2 .62/版本
/依赖关系
/依赖关系
/项目1.2启动类创建启动类com。it mentu。kafkaproducerapplication,代码如下:
@SpringBootApplication
公共类卡夫卡作品应用{
公共静态void main(String[] args){
春季申请。运行(kafkaproducerapplication。class,args);
}
}1.3 核心配置文件创建应用程序。阳明海运股份有限公司核心配置文件,代码如下:
服务器:
端口:18081
春天:
卡夫卡:
引导服务器:192.168.211.130:9092,192.168.211.130:9093,192.168.211.130:9094
制片人:#制片人生产者
重试次数:0 #重试次数
确认:1 #应答级别:多少个分区副本备份完成时向生产者发送确认字符(确认字符)确认(可选0、1、全部/-1)
批量:16384 #批量大小
缓冲存储器:33554432 #生产端缓冲区大小
密匙序列化器:org。阿帕奇。卡夫卡。常见。序列化。字符串序列化程序
值序列化程序:org。阿帕奇。卡夫卡。常见。序列化。字符串序列化程序参数说明:
引导服务器:卡夫卡服务地址
重试次数:发送失败后,重试次数,0表示不重试
密钥串行器:向卡夫卡发送数据,键采用的序列化方式
值序列化程序:向卡夫卡发送数据,数据采用的序列化方式1.4 生产者消息发送消息发送使用卡夫卡模板实现消息发送,创建com。it mentu。控制器。发送控制器,实现消息发送,代码如下:
@RestController
@请求映射(value=/producer )
公共类发送控制器{
@自动连线
私立卡夫卡模板
/***
* 发送消息
*主题:要发送的队列
*消息:发送的消息
*/
@ get mapping(value=/send/{ topic }/{ msg } )
公共字符串发送(@ path变量(value= topic )字符串主题,@ path变量(value= msg )字符串消息){
//消息发送
kafkaTemplate.send(话题,味精);
返回"成功";
}
}发送消息的时候,使用的是kafkaTemplate.send(话题,msg),第一个参数是队列名字或者叫主题名字,第2个参数是发送的消息。
我们可以请求http://localhost:18081/producer/send/itmentu/hello测试,老鹰打开,可以看到效果如下:
2消费者跳羚集成卡夫卡实现消费者消费消息,流程如下:
1:pom.xml引入依赖包
2:创建启动类
3:配置核心配置文件application.yml2.1 pom.xml和生产者一样,都需要引入春天-卡夫卡依赖,配置如下:
?可扩展标记语言版本=1.0 编码=UTF八号?
项目xmlns= http://maven。阿帕奇。org/POM/4。0 .0
xmlns:xsi= http://。w3。 org/2001/XML架构-实例
xsi:架构位置= http://maven。阿帕奇。org/POM/4。0 .0 http://maven.apache.org/xsd/maven-4.0.0.xsd
型号版本4 .0 .0/型号版本
groupId com.itmentu /groupId
卡夫卡的作品-消费者-01 /artifactId
版本1.0-快照/版本
父母
groupId组织。spring框架。boot/groupId
工件id spring-boot-starter-parent/工件id
版本2.3.8 .发布/版本
/父母
属国
属国
groupId组织。spring框架。boot/groupId
工件id spring-boot-starter-web/工件id
/依赖关系
属国
groupId组织。spring框架。卡夫卡/groupId
artifactId spring-Kafka/artifactId
/依赖关系
属国
groupId com.alibaba /groupId
artifactId fastjson /artifactId
版本1 .2 .62/版本
/依赖关系
/依赖关系
/项目2.2启动类创建启动类com。it mentu。kafkaconsumerapapplication,代码如下:
@SpringBootApplication
公共类KafkaConsumerApplication {
公共静态void main(String[] args){
春季申请。运行(kafkaconsumeraplication。class,args);
}
}2.3 核心配置文件创建应用程序. yml配置如下:
服务器:
端口:18082
春天:
卡夫卡:
引导服务器:192.168.211.130:9092,192.168.211.130:9093,192.168.211.130:9094
消费者:#消费者消费者
组id: mentugroup #默认的消费组身份
启用-自动提交:真#是否自动提交抵消
自动提交时间间隔:100 #提交抵消延时(接收到消息后多久提交偏移)
#最早:当各分区下有已提交的抵消时,从提交的抵消开始消费;无提交的抵消时,从头开始消费
#最新:当各分区下有已提交的抵消时,从提交的抵消开始消费;无提交的抵消时,消费新产生的该分区下的数据
#无:主题各分区都存在已提交的抵消时,从抵消后开始消费;只要有一个分区不存在已提交的偏移,则抛出异常
自动偏移复位:最新
key-deserializer:org。阿帕奇。卡夫卡。常见。序列化。字符串反串行化器
值-反序列化器:org。阿帕奇。卡夫卡。常见。序列化。字符串反串行化器参数说明:
引导服务器:卡夫卡服务地址
群组id:消费者组默认名字
启用-自动提交:每次消费数据后,需要提交数据的偏移量,这里真实的表示自动提交,假的表示手动提交
密钥解串器:读卡夫卡数据,键采用的反序列化方式
值反序列化程序:读卡夫卡数据,数据采用的反序列化方式2.4 消费消息卡夫卡消费消息采用@KafkaListener(topics={} ,groupId= )方式消费数据,我们可以创建一个消息监听类实现消息消费,创建com。伊门图。听众。消息监听器,代码如下:
@组件
公共类MessageListener {
@ KafkaListener(topics={ it mentu },groupId=itmentuGroup )
公共空的侦听器(消费者记录字符串,字符串记录){
//获取消息
字符串消息=记录。value();
//消息偏移量
长偏移量=记录。offset();
System.out.println(读取的消息:消息\n当前偏移量:偏移量);
}
}测试效果如下:
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。