首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Kafka技术内幕之Producer RecordAccumulator

kafka发送消息时,并不是直接将消息从客户端通过网络发送给服务器端,而是先将消息存储在客户端的记录收集器中,当队列满了或者发送时间已到的时候才会去发送,这个记录收集器就是RecordAccumulator。

我们首先通过类图及流程图来看看涉及到的类以及它们之间的关系,然后再一步步深入每个类进行分析。

一. 类图

从上图中我们可以看出,kafka消息最终是存储在字节缓冲区ByteBuffer中的。

二. 添加记录收集器流程图:

三. 代码分析

RecorAccumulator

记录收集器

compression:压缩类型,有NONE,Snappy,gzip,lz4等等。

batchSize:批记录大小,对应配置batch.size,表示队列的大小,单位是字节

lingerMs:消息发送的延迟时间,默认是0,即立即发送,这个参数和batch.size要配合使用

BufferPool:缓冲池,由于消息的底层是ByteBuffer,在消息发送过程中,频繁操作ByteBuffer对内存消耗很大,BufferPool是ByteBuffer的池,创建之前如果有,则直接使用,否则才创建

batches:这个参数是最重要的参数,是一个ConcurrentMap,key是TopicPartition对象,值是一个双端队列,队列元素是ProducerBatch对象

incomplement:未发送完成的ProducerBatch结合,底层通过Set

集合实现

drainIndex:使用drain方法批量导出ProducerBatch时,为了防止饥饿,使用drainIndex记录上次发送停止时的位置,下次继续从此位置开始发送。

//根据TopicPartition对象获取队列,如果不存在,则创建

// 添加ProducerBatch对象到队列中

ProducerBatch

消息批记录,双端队列的元素

topicPartition:主题分区对象

thunks:Thunk对象保存了callBack和FutureRecordMetadata,这里面包含了消息的元数据,偏移量等,这个值可以保证当消息被切分了好几段,发送了好几次或者发送失败时,客户端处理重发操作

recordsBuilder:MemoryRecordBuilder对象,关键参数,将消息添加头部信息,魔数信息,偏移量,压缩等信息后转换成ByteBuffer

// 添加到队列

MemoryRecordBuilder

内存记录构建器

compressionType:压缩类型

bufferStream:消息的ByteBufferOutputStream的展现方式

appendStream:消息添加了压缩方式后的DataOuputStream的方式,本质上是对bufferStream的包装,只是添加了压缩方式

builtRecords:MemoryRecords对象,消息的最终存储,ByteBuffer,字节缓冲区

// MemoryRecordBuilder的appedn的最终方法调用的是该方法,计算了偏移量

// 添加

MemoryRecords

内存记录集

buffer:字节缓冲区,消息的最终存放形式

以上就是Producer RecordAccumulator部分的介绍,下一篇我们将介绍Kafka Sender(发送线程)。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180928G1BNPK00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券