springboot集成kafka集群,springboot集成kafka0.10.2 消费者代码

  springboot集成kafka集群,springboot集成kafka0.10.2 消费者代码

  一生产者跳羚集成卡夫卡,无非就是生产者和消费者,但首先得实现跳羚集成,流程如下:

  1:pom.xml引入依赖包

  2:创建启动类

  3:配置核心配置文件application.yml1.1 pom.xml在pom.xml中引入春天——卡夫卡,代码如下:

  ?可扩展标记语言版本=1.0 编码=UTF八号?

  项目xmlns= http://maven。阿帕奇。org/POM/4。0 .0

  xmlns:xsi= http://。w3。 org/2001/XML架构-实例

  xsi:架构位置= http://maven。阿帕奇。org/POM/4。0 .0 http://maven.apache.org/xsd/maven-4.0.0.xsd

  型号版本4 .0 .0/型号版本

  groupId com.itmentu /groupId

  卡夫卡-制作者-01 /artifactId

  版本1.0-快照/版本

  父母

  groupId组织。spring框架。boot/groupId

  工件id spring-boot-starter-parent/工件id

  版本2.3.8 .发布/版本

  /父母

  属国

  属国

  groupId组织。spring框架。boot/groupId

  工件id spring-boot-starter-web/工件id

  /依赖关系

  属国

  groupId组织。spring框架。卡夫卡/groupId

  artifactId spring-Kafka/artifactId

  /依赖关系

  属国

  groupId com.alibaba /groupId

  artifactId fastjson /artifactId

  版本1 .2 .62/版本

  /依赖关系

  /依赖关系

  /项目1.2启动类创建启动类com。it mentu。kafkaproducerapplication,代码如下:

  @SpringBootApplication

  公共类卡夫卡作品应用{

  公共静态void main(String[] args){

  春季申请。运行(kafkaproducerapplication。class,args);

  }

  }1.3 核心配置文件创建应用程序。阳明海运股份有限公司核心配置文件,代码如下:

  服务器:

  端口:18081

  春天:

  卡夫卡:

  引导服务器:192.168.211.130:9092,192.168.211.130:9093,192.168.211.130:9094

  制片人:#制片人生产者

  重试次数:0 #重试次数

  确认:1 #应答级别:多少个分区副本备份完成时向生产者发送确认字符(确认字符)确认(可选0、1、全部/-1)

  批量:16384 #批量大小

  缓冲存储器:33554432 #生产端缓冲区大小

  密匙序列化器:org。阿帕奇。卡夫卡。常见。序列化。字符串序列化程序

  值序列化程序:org。阿帕奇。卡夫卡。常见。序列化。字符串序列化程序参数说明:

  引导服务器:卡夫卡服务地址

  重试次数:发送失败后,重试次数,0表示不重试

  密钥串行器:向卡夫卡发送数据,键采用的序列化方式

  值序列化程序:向卡夫卡发送数据,数据采用的序列化方式1.4 生产者消息发送消息发送使用卡夫卡模板实现消息发送,创建com。it mentu。控制器。发送控制器,实现消息发送,代码如下:

  @RestController

  @请求映射(value=/producer )

  公共类发送控制器{

  @自动连线

  私立卡夫卡模板

  /***

  * 发送消息

  *主题:要发送的队列

  *消息:发送的消息

  */

  @ get mapping(value=/send/{ topic }/{ msg } )

  公共字符串发送(@ path变量(value= topic )字符串主题,@ path变量(value= msg )字符串消息){

  //消息发送

  kafkaTemplate.send(话题,味精);

  返回"成功";

  }

  }发送消息的时候,使用的是kafkaTemplate.send(话题,msg),第一个参数是队列名字或者叫主题名字,第2个参数是发送的消息。

  我们可以请求http://localhost:18081/producer/send/itmentu/hello测试,老鹰打开,可以看到效果如下:

  2消费者跳羚集成卡夫卡实现消费者消费消息,流程如下:

  1:pom.xml引入依赖包

  2:创建启动类

  3:配置核心配置文件application.yml2.1 pom.xml和生产者一样,都需要引入春天-卡夫卡依赖,配置如下:

  ?可扩展标记语言版本=1.0 编码=UTF八号?

  项目xmlns= http://maven。阿帕奇。org/POM/4。0 .0

  xmlns:xsi= http://。w3。 org/2001/XML架构-实例

  xsi:架构位置= http://maven。阿帕奇。org/POM/4。0 .0 http://maven.apache.org/xsd/maven-4.0.0.xsd

  型号版本4 .0 .0/型号版本

  groupId com.itmentu /groupId

  卡夫卡的作品-消费者-01 /artifactId

  版本1.0-快照/版本

  父母

  groupId组织。spring框架。boot/groupId

  工件id spring-boot-starter-parent/工件id

  版本2.3.8 .发布/版本

  /父母

  属国

  属国

  groupId组织。spring框架。boot/groupId

  工件id spring-boot-starter-web/工件id

  /依赖关系

  属国

  groupId组织。spring框架。卡夫卡/groupId

  artifactId spring-Kafka/artifactId

  /依赖关系

  属国

  groupId com.alibaba /groupId

  artifactId fastjson /artifactId

  版本1 .2 .62/版本

  /依赖关系

  /依赖关系

  /项目2.2启动类创建启动类com。it mentu。kafkaconsumerapapplication,代码如下:

  @SpringBootApplication

  公共类KafkaConsumerApplication {

  公共静态void main(String[] args){

  春季申请。运行(kafkaconsumeraplication。class,args);

  }

  }2.3 核心配置文件创建应用程序. yml配置如下:

  服务器:

  端口:18082

  春天:

  卡夫卡:

  引导服务器:192.168.211.130:9092,192.168.211.130:9093,192.168.211.130:9094

  消费者:#消费者消费者

  组id: mentugroup #默认的消费组身份

  启用-自动提交:真#是否自动提交抵消

  自动提交时间间隔:100 #提交抵消延时(接收到消息后多久提交偏移)

  #最早:当各分区下有已提交的抵消时,从提交的抵消开始消费;无提交的抵消时,从头开始消费

  #最新:当各分区下有已提交的抵消时,从提交的抵消开始消费;无提交的抵消时,消费新产生的该分区下的数据

  #无:主题各分区都存在已提交的抵消时,从抵消后开始消费;只要有一个分区不存在已提交的偏移,则抛出异常

  自动偏移复位:最新

  key-deserializer:org。阿帕奇。卡夫卡。常见。序列化。字符串反串行化器

  值-反序列化器:org。阿帕奇。卡夫卡。常见。序列化。字符串反串行化器参数说明:

  引导服务器:卡夫卡服务地址

  群组id:消费者组默认名字

  启用-自动提交:每次消费数据后,需要提交数据的偏移量,这里真实的表示自动提交,假的表示手动提交

  密钥解串器:读卡夫卡数据,键采用的反序列化方式

  值反序列化程序:读卡夫卡数据,数据采用的反序列化方式2.4 消费消息卡夫卡消费消息采用@KafkaListener(topics={} ,groupId= )方式消费数据,我们可以创建一个消息监听类实现消息消费,创建com。伊门图。听众。消息监听器,代码如下:

  @组件

  公共类MessageListener {

  @ KafkaListener(topics={ it mentu },groupId=itmentuGroup )

  公共空的侦听器(消费者记录字符串,字符串记录){

  //获取消息

  字符串消息=记录。value();

  //消息偏移量

  长偏移量=记录。offset();

  System.out.println(读取的消息:消息\n当前偏移量:偏移量);

  }

  }测试效果如下:

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: