前面对kafka的学习中已经了解到KafkaProducer通过设定参数,如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数。
本片文章主要分析几个问题:
哪些异常可以重试
如何实现重试
接下来通过分析一一解开这些问题的答案。
1.哪些异常可以重试
通过方法名可知,其作用是判断是否能重试,由方法体内的实现可知,允许重试需要满足两个条件:
重试次数少于参数指定的值;
异常是RetriableException类型或者TransactionManager允许重试;
transactionManager.canRetry()后面会分析;先看看哪些异常是RetriableException类型异常。
RetriableException类型异常
kafka对RetriableException异常注释是:短暂性的通过重试可以成功的异常;通过RetriableException类关系图可知,可重试异常有图中RetriableException的子类那些异常(可以通过异常是否继承自RetriableException判断是否可重试异常):
retries exception list
TransactionManager允许重试
如果异常不属于RetriableException类型,但是只要满足就允许重试,所以,首先需要满足transactionManager不为null。transactionManager是在KafkaProducer中构造Sender传入的。构造TransactionManager的核心源码如下:
根据上面源码分析可知,只要用户配置了transactional.id,且没有显示配置enable.idempotence为false,那么TransactionManager就不会为null;
接下来还要满足才允许重试,主要包括下面几种情况:
碰到OutOfOrderSequenceException异常
broker的响应报文中没有logStartOffset(正常的响应信息:"T0-0" -> "")
2.如何实现重试
上面说明了什么情况下允许重试,接下来分析kafka是如何实现重试的。
2.1原理图
本打算把原理图放在最后,但是最后还是决定放在前面。对重试机制有一定的了解后,再看后面的分析就容易很多。kafka发送&重试机制如下图所示:
消息发送&重试机制.png
说明:
new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息;
调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
如果发送成功,那么返回成功;
如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;
初步了解整个发送&重试过程后,再根据源码进行更深入的分析。
2.2后台线程
分析kafka如何实现重试之前,先看一下发送消息到broker前做的主要事情:
构造KafkaProducer时,构造Send并启动一个异步线程:
且从这段代码可知,每个KafkaProducer会启动一个线程处理消息,这个线程命名为:kafka-producer-network-thread | $。
笔者某个实例查看KafkaProducer启动的线程结果如下:
调用KafkaProducer的send()方法时,先把发送的消息存储在accumulator中:
2.3RecordAccumulator
RecordAccumulator是保存需要发送的消息或者重试消息的核心。发送消息之前先把消息存放在这里,异步线程KafkaThread启动后从这里取消息然后发送到broker。当发送出错且允许重试时,又会把这些需要重试的消息保存到这里再进行重试。
当调用KafkaProducer的send()方法发送消息时,会调用append()方法将消息暂时存放,核心源码如下:
当发送出错且允许重试时,会调用reenqueue()方法将消息暂时存放,核心源码如下:
RecordAccumulator简单总结:通过这两段代码的分析可知,保存需要发送的(重试)消息的核心数据结构是Deque。且创建队列时是,没有指定初始容量。这里不打算深入分析Deque,只是简单介绍一下,Deque是Double ended queue (双端队列) 的缩写。首尾都可写入可读取。
2.3发送&重试
下面分析kafka是如何发送并如何重试的。(TransactionManager相关代码被省略,其的作用后面有机会单独一篇文章分析);发送消息核心代码在Sender.java中, Sender.java实现了Runnable接口, 所以是后台线程异步发送消息到kafka集群:
KafkaProducer关闭有方式有两种:和,第一种是友好的关闭且设置timeout为,第二种如果设置timeout为0,就是强制关闭,即forceClose=true。
备注:drained: 流干,耗尽,undrained则表示未耗尽。
准备发送消息前需要尝试去accumulator中获取消息:
accumulator.drain()本质就是:,即根据分区信息得到Deque,然后不断获取ProducerBatch,即封装后的要发送的消息。
run(long)方法中往broker发送消息的部分核心代码(位于Sender.java中)如下:
handleProduceResponse()中收到的响应,如何是网络断开,那么构造响应:。如果有版本不匹配问题,那么构造响应:。还有一种特殊情况,如果指定了,那么构造响应,因为这种情况下只需要发送即可,不需要响应结果。接下来调用下面的方法--完成或者重试请求:
如果需要重试,重新入队列的源码如下:
领取专属 10元无门槛券
私享最新 技术干货