kafka发送消息超时,发送kafka超时

  kafka发送消息超时,发送kafka超时

  00-1010前言定位异常点,分析异常抛出的逻辑真实原因——解决方案结论

  00-1010:简单介绍一下我们的使用场景。线上五个Broker节点的kafka已经收到了binlog订阅的所有数据,Flink组件使用这些数据接收数据作为数据中心的原始数据。昨天的开发反馈,网上binlog报告了大量错误,都是卡夫卡的例外,都是同一个题目抛出的错误。特征也很明显,传递的信息非常大。主观判断一定是写大消息导致超时。例外情况的详细信息如下:

  thread : Kafka-producer-network-thread producer-1 rowable : org . Apache . Kafka.common . errors . time out exception : BIN-LOG-DATA-center-dmz 2-TABLE-kk-DATA-center-ODS _ contract _ finance _ info-0的1条记录因56352毫秒而过期,自上次追加后已过

  00-1010应用程序抛出异常。一般操作都是先搜索百度或者谷歌。至于上面的超时异常,搜索引擎的结果都是producer无法连接Borker导致的问题,根本不在我们的场景中,所以其次我们需要从源代码中寻找答案。博主使用的开发工具是IDEA,可以很容易的定位异常抛出点。首先定位到TimeoutException异常类,然后按住ctrl键,点击这个类,抛出TimeoutException异常的点都会出现,如下图所示。然后根据异常消息的内容,寻找匹配的点击抛出异常,如图,红色箭头指的是代码位置:

  00-1010程序中的异常在抛出之前必须满足某些条件。要解决异常,只要让运行时环境不满足抛出异常的条件即可。下面是引发异常的代码:

  boolean maybe expire(int requestTimeoutMs,long retryBackoffMs,long now,long lingerMs,boolean isFull) { if(!this . in retry()is full requestTimeoutMs(now-this . lastpappendtime))expiryerror message=(now-this . lastpappendtime)自上次追加后已过毫秒;else if(!this . in retry()requestTimeoutMs(createdTimeMs(now)-lingerMs))expiryErrorMessage=(createdTimeMs(now)-lingerMs)自批处理创建加上逗留时间以来已过毫秒;else if(this . in retry()requestTimeoutMs(waite dtimems(now)-retryBackoffMs))expiryErrorMessage=(waite dtimems(now)-retryBackoffMs)自上次尝试加上回退时间以来已过去毫秒数;boolean expired=expiry error message!=nullif(过期)abortRecordAppends();退货过期;}如你所见,我们的异常在第一次逻辑判断时就得到满足,所以我们抛出异常。这里,有可能抛出三种不同的超时异常。中文语义翻译的条件是:

  未设置重试,发送批次(batch.size)已满,配置请求超时(request.timeout.ms)小于[当前时间减去最后一个附加批次时间]。未设置重试,并且配置请求超时(request.timeout.ms)小于[批处理创建时间减去配置的发送等待时间(linger.ms)]。并且配置请求超时(request.timeout.ms)小于[当前时间-上次重试时间-重试等待时间(retry.backoff.ms)]。上面括号中的参数是在kafka producer中配置的相关参数,这些参数没有被重置。batch.size默认为10kb。导致错误的消息大小都是36kb,request.timeout.ms的默认超时设置是30s,所以在这种判断可能过期的方法中,导致我们异常的主要原因是batch.size和request.timeout.ms的参数设置问题。

  00-1010从上面的代码看,表面原因是参数设置不够。其实博主用kafka-test启动了5个Borker集群做递归验证测试,测试都写到同一个36kb的消息,在默认保持所有配置的情况下完全没有压力。后来查阅相关的错误日志,我们发现所有的TimeoutExceptions几乎都集中在同一时间。发现业务批量导入数据到mysql,导致binlog消息骤增,向kafka写大消息的高并发导致Borker无法处理,导致超时异常超时。因此,我们确实可以从两个方面解决问题:

  服务器:添加Borker,设置多个TopicPartition,平均分担书写压力。这是解决问题的根本办法。客户端:增加request.timeout.ms和batch.size的参数,或者启动消息重试。这种解决方案可以治标不治本,但也可以减少这类场景很大概率导致的TimeoutException。

  00-1010异常不可怕。所有的异常都是人为抛出的,都有既定的触发条件。只要定位到触发条件,对症下药就能解决问题。但是近五年博主的经验发现,日志打印真的是一门艺术,在这方面,Spring框架、Dubbo、Apollo配置中心框架都是日志打印的典范。无论发生什么,日志都会输出详细的上下文、异常的原因和建议的解决方案。如果涉及到相关配置,也会打印如何配置最好。另一方面,kafka客户端的这个TimeoutException显示的信息有点太少。如果能指定相关配置信息和故障排除方向就更好了。安利的最后一波卡夫卡测试,可以轻松搭建一个有很多Borker的卡夫卡集群,一个评论就ok了。详见我的博文《深入研究springboot集成kafka之spring-kafka底层原理》。

  以上是kafka并发写大消息异常TimeoutException的故障排除记录详情。关于kafka并发异常TimeoutException故障排除的更多信息,请关注prevailing IT的其他相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: