首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

11.kafka重试机制解读

前面对kafka的学习中已经了解到KafkaProducer通过设定参数,如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数。

本片文章主要分析几个问题:

哪些异常可以重试

如何实现重试

接下来通过分析一一解开这些问题的答案。

1.哪些异常可以重试

通过方法名可知,其作用是判断是否能重试,由方法体内的实现可知,允许重试需要满足两个条件:

重试次数少于参数指定的值;

异常是RetriableException类型或者TransactionManager允许重试;

transactionManager.canRetry()后面会分析;先看看哪些异常是RetriableException类型异常。

RetriableException类型异常

kafka对RetriableException异常注释是:短暂性的通过重试可以成功的异常;通过RetriableException类关系图可知,可重试异常有图中RetriableException的子类那些异常(可以通过异常是否继承自RetriableException判断是否可重试异常):

retries exception list

TransactionManager允许重试

如果异常不属于RetriableException类型,但是只要满足就允许重试,所以,首先需要满足transactionManager不为null。transactionManager是在KafkaProducer中构造Sender传入的。构造TransactionManager的核心源码如下:

根据上面源码分析可知,只要用户配置了transactional.id,且没有显示配置enable.idempotence为false,那么TransactionManager就不会为null;

接下来还要满足才允许重试,主要包括下面几种情况:

碰到OutOfOrderSequenceException异常

broker的响应报文中没有logStartOffset(正常的响应信息:"T0-0" -> "")

2.如何实现重试

上面说明了什么情况下允许重试,接下来分析kafka是如何实现重试的。

2.1原理图

本打算把原理图放在最后,但是最后还是决定放在前面。对重试机制有一定的了解后,再看后面的分析就容易很多。kafka发送&重试机制如下图所示:

消息发送&重试机制.png

说明:

new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息;

调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;

后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;

如果发送成功,那么返回成功;

如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;

初步了解整个发送&重试过程后,再根据源码进行更深入的分析。

2.2后台线程

分析kafka如何实现重试之前,先看一下发送消息到broker前做的主要事情:

构造KafkaProducer时,构造Send并启动一个异步线程:

且从这段代码可知,每个KafkaProducer会启动一个线程处理消息,这个线程命名为:kafka-producer-network-thread | $。

笔者某个实例查看KafkaProducer启动的线程结果如下:

调用KafkaProducer的send()方法时,先把发送的消息存储在accumulator中:

2.3RecordAccumulator

RecordAccumulator是保存需要发送的消息或者重试消息的核心。发送消息之前先把消息存放在这里,异步线程KafkaThread启动后从这里取消息然后发送到broker。当发送出错且允许重试时,又会把这些需要重试的消息保存到这里再进行重试。

当调用KafkaProducer的send()方法发送消息时,会调用append()方法将消息暂时存放,核心源码如下:

当发送出错且允许重试时,会调用reenqueue()方法将消息暂时存放,核心源码如下:

RecordAccumulator简单总结:通过这两段代码的分析可知,保存需要发送的(重试)消息的核心数据结构是Deque。且创建队列时是,没有指定初始容量。这里不打算深入分析Deque,只是简单介绍一下,Deque是Double ended queue (双端队列) 的缩写。首尾都可写入可读取。

2.3发送&重试

下面分析kafka是如何发送并如何重试的。(TransactionManager相关代码被省略,其的作用后面有机会单独一篇文章分析);发送消息核心代码在Sender.java中, Sender.java实现了Runnable接口, 所以是后台线程异步发送消息到kafka集群:

KafkaProducer关闭有方式有两种:和,第一种是友好的关闭且设置timeout为,第二种如果设置timeout为0,就是强制关闭,即forceClose=true。

备注:drained: 流干,耗尽,undrained则表示未耗尽。

准备发送消息前需要尝试去accumulator中获取消息:

accumulator.drain()本质就是:,即根据分区信息得到Deque,然后不断获取ProducerBatch,即封装后的要发送的消息。

run(long)方法中往broker发送消息的部分核心代码(位于Sender.java中)如下:

handleProduceResponse()中收到的响应,如何是网络断开,那么构造响应:。如果有版本不匹配问题,那么构造响应:。还有一种特殊情况,如果指定了,那么构造响应,因为这种情况下只需要发送即可,不需要响应结果。接下来调用下面的方法--完成或者重试请求:

如果需要重试,重新入队列的源码如下:

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180812G07CN100?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券