序
前面一篇文章讨论了ConsumerFetcherManager的MaxLag与ConsumerOffsetChecker的lag值的区别。但是关于MaxLag的值还没有讲的太透彻,这里再深入一下,如何让ConsumerFetcherManager的MaxLag有值。
AbstractFetcherThread#processFetchRequest
kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherThread.scala
值得注意,这里构建了fetchRequest
这里的partitionMap,key是TopicAndPartition,value就是本地最大的offset
每次拉取的时候,以本地已经拉取的最大值,还有拉取大小构造fetchRequest
FetchRequest
kafka_2.10-0.8.2.2-sources.jar!/kafka/api/FetchRequest.scala
可以看到这里的offset与fetchSize决定了这个fetcher从broker拉取数据的开始位置和拉取数据的条数。
ConsumerFetcherThread
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala
这里使用的fetchSize来自config.fetchMessageMaxBytes
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerConfig.scala
这个fetchSize默认是1024 * 1024,也就是1048576,即每次fetch的时候拉取1048576这么多条。
AbstractFetcherThread#processFetchRequest
ConsumerFetcherThread#processPartitionData
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala
PartitionTopicInfo#enqueue
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/PartitionTopicInfo.scala
如果数据为空,则不放进队列
chunkQueue大小
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ZookeeperConsumerConnector.scala
queue在这里创建了,大小为config.queuedMaxMessages
默认队列最大只能有2个FetchedDataChunk
而每个FetchedDataChunk里头最大的消息数目就是fetchSize大小也就是10241024
也就是说每个消费线程的chunkQueue里头默认最大的消息数目为21024*1024
当超过这个数目的时候,enquue就会阻塞,这样就形成了对整个fetch的拉取速度的控制。
ConsumerFetcherManager的MaxLag
那么每次只拉10条消息,假设目前的lag如下
拉取一次之后
这里的nextOffset = offset + 1,也就是拉取回来的最大offset+1 = 259,hw的话是8702,那么lag值就是8702-259=8443
这里为了复现,让消费线程拉取一条之后抛异常退出
小结
生产环境注意根据消息大小以及环境内存等对如下参数进行配置,否则很容易引发OOM
另外关于ConsumerFetcherManager的MaxLag,只有在上面两个参数合理设置的情况下,才能对监控有点点帮助()。从实际场景来看,还是一般比较少改动参数的话,那么还是得以ConsumerOffsetChecker的lag值做消费者消费滞后的监控才准确。
doc
ConsumerFetcherManager MaxLag
apache kafka系列之jmx监控指标参数
Kafka源码分析 Consumer(2) Fetcher
领取专属 10元无门槛券
私享最新 技术干货