kafka发送消息时,并不是直接将消息从客户端通过网络发送给服务器端,而是先将消息存储在客户端的记录收集器中,当队列满了或者发送时间已到的时候才会去发送,这个记录收集器就是RecordAccumulator。
我们首先通过类图及流程图来看看涉及到的类以及它们之间的关系,然后再一步步深入每个类进行分析。
一. 类图
从上图中我们可以看出,kafka消息最终是存储在字节缓冲区ByteBuffer中的。
二. 添加记录收集器流程图:
三. 代码分析
RecorAccumulator
记录收集器
compression:压缩类型,有NONE,Snappy,gzip,lz4等等。
batchSize:批记录大小,对应配置batch.size,表示队列的大小,单位是字节
lingerMs:消息发送的延迟时间,默认是0,即立即发送,这个参数和batch.size要配合使用
BufferPool:缓冲池,由于消息的底层是ByteBuffer,在消息发送过程中,频繁操作ByteBuffer对内存消耗很大,BufferPool是ByteBuffer的池,创建之前如果有,则直接使用,否则才创建
batches:这个参数是最重要的参数,是一个ConcurrentMap,key是TopicPartition对象,值是一个双端队列,队列元素是ProducerBatch对象
incomplement:未发送完成的ProducerBatch结合,底层通过Set
集合实现
drainIndex:使用drain方法批量导出ProducerBatch时,为了防止饥饿,使用drainIndex记录上次发送停止时的位置,下次继续从此位置开始发送。
//根据TopicPartition对象获取队列,如果不存在,则创建
// 添加ProducerBatch对象到队列中
ProducerBatch
消息批记录,双端队列的元素
topicPartition:主题分区对象
thunks:Thunk对象保存了callBack和FutureRecordMetadata,这里面包含了消息的元数据,偏移量等,这个值可以保证当消息被切分了好几段,发送了好几次或者发送失败时,客户端处理重发操作
recordsBuilder:MemoryRecordBuilder对象,关键参数,将消息添加头部信息,魔数信息,偏移量,压缩等信息后转换成ByteBuffer
// 添加到队列
MemoryRecordBuilder
内存记录构建器
compressionType:压缩类型
bufferStream:消息的ByteBufferOutputStream的展现方式
appendStream:消息添加了压缩方式后的DataOuputStream的方式,本质上是对bufferStream的包装,只是添加了压缩方式
builtRecords:MemoryRecords对象,消息的最终存储,ByteBuffer,字节缓冲区
// MemoryRecordBuilder的appedn的最终方法调用的是该方法,计算了偏移量
// 添加
MemoryRecords
内存记录集
buffer:字节缓冲区,消息的最终存放形式
以上就是Producer RecordAccumulator部分的介绍,下一篇我们将介绍Kafka Sender(发送线程)。
领取专属 10元无门槛券
私享最新 技术干货