Springboot集成kafka高级应用实战()

  本篇文章为你整理了Springboot集成kafka高级应用实战()的详细内容,包含有 Springboot集成kafka高级应用实战,希望能帮助你了解 Springboot集成kafka高级应用实战。

   retries: 0 # 重试次数

   acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)

   batch-size: 16384 # 一次最多发送数据量

   buffer-memory: 33554432 # 生产端缓冲区大小

   key-serializer: org.apache.kafka.common.serialization.StringSerializer

   value-serializer: org.apache.kafka.common.serialization.StringSerializer

   consumer: # consumer消费者

   group-id: javagroup # 默认的消费组ID

   enable-auto-commit: true # 是否自动提交offset

   auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)

   auto-offset-reset: latest #earliest,latest

   key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

   value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  

 

 

  2)启动信息

  4.2 消息发送

  4.2.1 发送类型

  KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法

  详细代码参考:AsyncProducer.java

  消费者使用:KafkaConsumer.java

  1)同步发送

  

 ListenableFuture SendResult String, Object future = kafkaTemplate.send("test", JSON.toJSONString(message));

 

   //注意,可以设置等待时间,超出后,不再等候结果

   SendResult String, Object result = future.get(3,TimeUnit.SECONDS);

   logger.info("send result:{}",result.getProducerRecord().value());

  

 

  通过swagger发送,控制台可以正常打印send result

  2)阻断

  在服务器上,将kafka暂停服务

  

docker-compose pause kafka-1 kafka-2

 

  

 

  在swagger发送消息

  调同步发送:请求被阻断,一直等待,超时后返回错误

  而调异步发送的(默认发送接口),请求立刻返回。

  那么,异步发送的消息怎么确认发送情况呢???往下看!

  3)注册监听

  代码参考: KafkaListener.java

  可以给kafkaTemplate设置Listener来监听消息发送情况,实现内部的对应方法

  

 kafkaTemplate.setProducerListener(new ProducerListener String, Object () {});

 

  

 

  查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener

  

com.itheima.demo.config.KafkaListener:error!message={"message":"1","sendTime":1609920296374}

 

  

 

  启动kafka

  

docker-compose unpause kafka-1 kafka-2

 

  

 

  再次发送消息时,同步异步均可以正常收发,并且监听进入success回调

  

com.itheima.demo.config.KafkaListener$1:ok,message={"message":"1","sendTime":1610089315395}

 

  com.itheima.demo.controller.PartitionConsumer:patition=1,message:[{"message":"1","sendTime":1610089315395}]

  

 

  可以看到,在内部类 KafkaListener$1 中,即注册的Listener的消息。

  4.2.2 序列化

  消费者使用:KafkaConsumer.java

  1)序列化详解

  前面用到的是Kafka自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)

  除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等

  这些序列化器都实现了接口 (org.apache.kafka.common.serialization.Serializer)

  基本上,可以满足绝大多数场景

  2)自定义序列化

  自己实现,实现对应的接口即可,有以下方法:

  

public interface Serializer T extends Closeable {

 

   default void configure(Map String, ? configs, boolean isKey) {

   //理论上,只实现这个即可正常运行

   byte[] serialize(String var1, T var2);

   //默认调上面的方法

   default byte[] serialize(String topic, Headers headers, T data) {

   return this.serialize(topic, data);

   default void close() {

  

 

  案例,参考: MySerializer.java

  在yaml中配置自己的编码器

  

value-serializer: com.itheima.demo.config.MySerializer

 

  

 

  重新发送,发现:消息发送端编码回调一切正常。但是消费端消息内容不对!

  

com.itheima.demo.controller.KafkaListener$1:ok,message={"message":"1","sendTime":1609923570477}

 

  com.itheima.demo.controller.KafkaConsumer:message:"{\"message\":\"1\",\"sendTime\":1609923570477}"

  

 

  怎么办?

  3)解码

  发送端有编码并且我们自己定义了编码,那么接收端自然要配备对应的解码策略

  代码参考:MyDeserializer.java,实现方式与编码器几乎一样!

  在yaml中配置自己的解码器

  

value-deserializer: com.itheima.demo.config.MyDeserializer

 

  

 

  再次收发,消息正常

  

com.itheima.demo.controller.AsyncProducer$1:ok,message={"message":"1","sendTime":1609924855896}

 

  com.itheima.demo.controller.KafkaConsumer:message:{"message":"1","sendTime":1609924855896}

  

 

  4.2.3 分区策略

  分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。

  给定了分区号,直接将数据发送到指定的分区里面去

  没有给定分区号,给定数据的key值,通过key取上hashCode进行分区

  既没有给定分区号,也没有给定key值,直接轮循进行分区

  自定义分区,你想怎么做就怎么做

  1)验证默认分区规则

  发送者代码参考:PartitionProducer.java

  消费者代码使用:PartitionConsumer.java

  通过swagger访问setKey:

  
 

  看控制台:

  再访问setPartition来设置分区号0来发送

  
 

  看控制台:

  2)自定义分区

  你想自己定义规则,根据我的要求,把消息投放到对应的分区去? 可以!

  参考代码:MyPartitioner.java , MyPartitionTemplate.java ,

  发送使用:MyPartitionProducer.java

  使用swagger,发送0开头和非0开头两种key试一试!

  备注:

  自己定义config参数,比较麻烦,需要打破默认的KafkaTemplate设置

  可以将KafkaConfiguration.java中的getTemplate加上@Bean注解来覆盖系统默认bean

  这里为了避免混淆,采用@Autowire注入

  4.3 消息消费

  4.3.1 消息组别

  发送者使用:KafkaProducer.java

  1)代码参考:GroupConsumer.java,Listener拷贝3份,分别赋予两组group,验证分组消费:

  2)启动

  3)通过swagger发送2条消息

  同一group下的两个消费者,在group1均分消息

  group2下只有一个消费者,得到全部消息

  4)消费端闲置

  注意分区数与消费者数的搭配,如果 ( 消费者数 分区数量 ),将会出现消费者闲置,浪费资源!

  验证方式:

  停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。

  重新发送两条消息,试一试

  解析:

  group2可以消费到1、2两条消息

  group1下有两个消费者,但是只分配给了 -1 , -2这个进程被闲置

  4.3.2 位移提交

  1)自动提交

  前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交

  

enable-auto-commit: true # 是否自动提交offset

 

  auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)

  

 

  2)手动提交

  有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。

  下面我们自己定义配置,覆盖上面的参数

  代码参考:MyOffsetConfig.java

  通过在消费端的Consumer来提交偏移量,有如下几种方式:

  代码参考:MyOffsetConsumer.java

  同步提交、异步提交:manualCommit() ,同步异步的差别,下面会详细讲到。

  指定偏移量提交:offset()

  3)重复消费问题

  如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费!

  代码参考和对比:manualCommit() , noCommit()

  验证过程:

  用km将test主题删除,新建一个test空主题。方便观察消息偏移
 

  注释掉其他Consumer的Component注解,只保留当前MyOffsetConsumer.java
 

  启动项目,使用swagger的KafkaProducer发送连续几条消息
 

  留心控制台,都能消费,没问题:

  
 

  但是!重启试试:

  
 

  无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!

  再通过命令行查询偏移量试试:

  4)经验与总结

  

commitSync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会。

 

  这就造成一个陷阱:

  如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。

  但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进程。就会造成重复消费!

  因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。

  详细代码参考:MyOffsetConsumer.manualOffset()

  

 

  本文由传智教育博学谷 - 狂野架构师教研团队发布
 

  如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力
 

  转载请注明出处!

  以上就是Springboot集成kafka高级应用实战()的详细内容,想要了解更多 Springboot集成kafka高级应用实战的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: