本篇文章为你整理了Kafka消费消息丢失排查,原因竟是groupId重复(kafka数据丢失和重复消费)的详细内容,包含有kafka消费丢失数据 kafka数据丢失和重复消费 kafka消息消费失败 kafka消息被消费后存活多久? Kafka消费消息丢失排查,原因竟是groupId重复,希望能帮助你了解 Kafka消费消息丢失排查,原因竟是groupId重复。
BI的同事发现某指标数据展示有问题,发现最近入库的数据缺失,然后反馈到DBA. 经DBA排查后发现原始数据缺少.
之前笔者在休假,同事初步排查怀疑是消息阻塞导致.经过代码调整发版之后发现还是有情况发生.
笔者接手之后,在本地打印指定点位的消息,发现没有丢失消息的情况.(15分钟一条消息)
于是在线上系统中添加了打印指定点位的日志.(发版下班)
第二天,查看日志发现有缺失情况,本地打印继续开启 发现没有复现
于是查询消费组,收集到一组 host(ip)
@Test
public void showGroupInfo() throws ExecutionException, InterruptedException {
String id = "kafka消费组id";
DescribeConsumerGroupsResult describeConsumerGroupsResult = admin.describeConsumerGroups(Collections.singleton(id));
final KafkaFuture Map String, ConsumerGroupDescription all = describeConsumerGroupsResult.all();
final Map String, ConsumerGroupDescription stringConsumerGroupDescriptionMap = all.get();
final Set Map.Entry String, ConsumerGroupDescription entries1 = stringConsumerGroupDescriptionMap.entrySet();
for (Map.Entry String, ConsumerGroupDescription stringConsumerGroupDescriptionEntry : entries1) {
final ConsumerGroupDescription value = stringConsumerGroupDescriptionEntry.getValue();
final Collection MemberDescription members = value.members();
for (MemberDescription member : members) {
String s1 = member.consumerId();
String host = member.host();
String s = member.clientId();
String s2 = member.assignment().toString();
System.out.printf("clientId[%s],memberId[%s],host[%s],assignment[%s]%n", s, s1, host, s2);
找运维同事查看ip之后发现了另一个项目,拉取jar反编译之后发现配置文件中kafka 消费者 groupId 配置相同
反馈相关项目负责人
如何统一管理/监控 kafka group 划分 避免此类问题发生?
kafka groupId 是否要按照微服务项目来划分?
以上就是Kafka消费消息丢失排查,原因竟是groupId重复(kafka数据丢失和重复消费)的详细内容,想要了解更多 Kafka消费消息丢失排查,原因竟是groupId重复的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。