【引言】
最近遇到了一个和kafka相关的问题,具体是在spark任务在一定并行度的情况下, 偶现个别executor因kafka消息发送超时导致失败的情况。正所谓磨刀不误砍柴工,为了能较好的定位问题,因此先对kafka客户端消息发送相关逻辑的代码进行了走读,本文就是对相关原理的一些总结。
【相关概念(数据结构)】
在客户端里,一些重要的概念或对应的数据结构包括:
1. ProducerRecord
生产者发送的每条消息,都对应一个ProduceRecord类实例对象,记录了包括消息的key,value,时间戳,header,topic,partition信息。
客户端发送消息时,并不是调用send
接口发送一条消息,就实际将该消息通过网络发送出去,而是攒够一批进行发送。在具体实现中,ProducerBatch就对应这个批的概念。ProducerBatch本质上是一批消息的集合,也就是上面ProduceRecord中的key、value、header经过序列化后的字节数据存储在ProducerBatch中。
RecordAccumlator是上面ProducerBatch的集合。由于消息只能发往topic的某一个分区,发往同一个topic的一个或多个消息组成ProducerBatch,多个ProduceBatch组成一个链表,在RecordAccumlator内部,则以topic的分区为key,ProduceBatch链表为value,缓存所有待发送的消息。
一个简单的示例如下图所示:
4. BufferPool
一块大的内存池,存储消息记录序列化后的字节数据,即ProduceBatch中用于存放具体消息内容的内存就是从BufferPool中申请的。
在BufferPool内部,分为两种类型的内存,一种是固定大小的内存,这些内存先从系统申请,使用完(消息发送完毕并确认收到)后,回收后放到内存池中,以便后续使用;另一种是不固定大小的内存,通常是大于一个批大小的内存,这些内存也是直接从系统申请,但使用完成后,不会放到内存池,而是等触发垃圾回收时,被系统回收掉。
专门的消息发送线程,定时从RecordAccumlator取出一批消息并发送给服务端。
负责与所有broker通信,包括与broker建立连接,协议上的交互(将消息按指定协议格式发送,定时更新元数据等等),以及处理broker的响应消息。
如果从全局的视角来看,kafka客户端的架构可能是这样的一个分层:
【消息发送流程】
从上面的介绍中,以及可以猜出大概的消息处理流程。简单概括客户端消息发送的逻辑就是:业务线程(调用producer.send()
的线程)将消息序列化,并存放到ProduceBatch中,然后按需唤醒sender发送线程;发送线程从RecordAccumlator挑选出待发送的ProduceBatch列表,并按照指定协议格式构造请求,然后发送给topic分区leader对应的broker,接着接收服务端的响应,并进行处理以及回调通知。
展开来说的话,流程如下图所示:
还是分为两部分,在业务线程中:
在发送线程中:
【有关的配置】
一些常用的,并且和上面流程或概念有关的参数包括:
1. buffer.memory
bufferPool的总大小,默认大小为32MB,每次分配后可用空间减少,当使用完回收后,可用空间又对应增加。如果单次申请的内存大于这个值,会直接抛异常;而如果BufferPool中剩余可用空间的值不满足条件时,则会阻塞线程,直到已有消息发送完成被释放后,会通知该线程解除阻塞,重新分配。
一个ProducerBatch的消息,也是BufferPool中内存池里每个内存块的大小。默认大小为16KB。如果单条消息的大小大于这个值,则按实际大小从BufferPool中申请;如果单条消息的值小于这个值,则以该值为单位从BufferPool中申请。另外,当有新的消息写入时,如果一个ProduceBatch还未写满,并且剩余空间足够存储该消息,那么则会追加写到该ProduceBatch中。这也就意味着,一个ProduceBatch里包含一条大于该值的消息,或包含多条小于该值的消息。
ProducerBatch的超时时间。每次往ProducerBatch追加写时,会更新追加时间,如果Producebatch的最后更新时间距离当前时间超过了发送超时时间,那么则认为是发送超时。并提示“ xxx ms has passed since last append”
。
4. linger.ms
前面消息发送流程里提到了,单条消息并不是立即发送的,而是攒够一批发送,那么如果后续一直没有消息了,那是不是也就一直不发送了呢?显然不是这样,一个ProduceBatch最长等待时间就是由linger.ms来决定的,sender线程在从ProduceBatch的表头取出ProduceBatch时,会根据当前时间与ProduceBatch的最近一次发送时间(如果没有发送则是ProduceBatch的创建时间)进行比较,如果小于linger.ms指定的时间,则不进入本次真正待发送的列表中,同时计算出剩余时间,这其实就是后续poll轮询与broker的连接,等待IO事件的时间。
另外,如果当前时间减去ProduceBatch的创建时间,大于发送超时时间与linger.ms时间之和,那么也会导致ProduceBatch的发送超时。
【总结】
总结一下,通过本文的介绍,应该对kafka客户端内部的整体设计、消息存储发送流程有了个简单的认识,遇到一些报错时,也能从流程上进行初步的分析定位,至于深层次的问题,那就还需要再对源码深入分析,而本文开头提到的问题,由于问题未复现,这里也就没有近一步分析说明,后续再结合问题对内部原理展开说明。