Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >kafka消费组信息采集异常(hang住)排查

kafka消费组信息采集异常(hang住)排查

原创
作者头像
皮皮熊
修改于 2022-02-28 12:19:48
修改于 2022-02-28 12:19:48
2.9K0
举报

一、问题描述

小组同学在使用kafka官方工具kafka-consumer-groups.sh批量导集群消费组详情时,发现某一个集群基于broker的某些消费组会出现异常,主要表现:

  • 结果不全: 只有部分分区的信息
  • 进程会阻塞: 不会像导他消费组时那样,执行完自动退出
image.png
image.png

二、问题分析

1、信息梳理

兵马未动,粮草先行。在分析问题前我们需要具体梳理一下潜在的线索:

  • 集群的版本是0.9.0.1的bug?(古老的版本,最近半年会裁撤掉)但目前0.9.0.1版本集群中只有这个出现这样的问题
  • 集群部署不规范,broker和zk端口不统一的问题?但之前也有类似部署不规范的集群,没有出现过这样的问题
  • 集群机器有异常?
  • broker消费特殊场景下的bug?此次异常的消费组大多同时消费2个topic: 一个是日常三副本的topic,一个是离线补录的二副本的topic,确实存在bug的可能性。
  • kafka-consumer-groups.sh特殊场景下的Bug?

2、机器问题排查

1) strace相关进程,发现进程确实阻塞住了

2) 查看/proc/pid/stack看则有下面的堆栈输出

代码语言:txt
AI代码解释
复制
$ cat /proc/12097/stack

[<ffffffff81097b6b>] futex_wait_queue_me+0xdb/0x140
[<ffffffff81097e46>] futex_wait+0x166/0x250
[<ffffffff81099f1e>] do_futex+0xde/0x570
[<ffffffff8109a421>] SyS_futex+0x71/0x150
[<ffffffff81b2a202>] system_call_fastpath+0x16/0x1b
[<ffffffffffffffff>] 0xffffffffffffffff

3) 查看机器版本

与现网其他机器相比,没有太大的差异,机器层面相关性可能不是很大。

3、__consumer_offsetstopic排查

基于broker消费的消费组,其偏移量的元数据信息是存储在__consumer_offsets这个topic下的。笔者之前在《kafka部分group无法正常消费数据排查》一文中曾介绍过因__consumer_offsets问题导致group异常的情况,所以便查看了一下__consumer_offsets的情况,发现一切正常:

image.png
image.png

4、进程阻塞原因排查

几经周折,没有发现什么进展,还是决定回到kafka-consumer-groups.sh本身,从查看进程堵塞原因出发。此时就需要我们jstack查看一下进程内诸线程的情况,我们发现:

image.png
image.png

进程阻塞在获取某个分区的HW(HighWatermark)上(注意:LEO对消费者是不可见的,所以这里虽然调用的方法是getLogEndOffset,但实际上是获取HW),这时我们就要从源码中进行深入的分析。

三、源码分析

kafka-consumer-groups.sh获取基于broker消费組信息,即调用kafka.admin.ConsumerGroupCommandKafkaConsumerGroupService.describeGroup。相关实现如下:

1、KafkaConsumerGroupService

代码语言:txt
AI代码解释
复制
class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {

  private val adminClient = createAdminClient()

  // `consumer` is only needed for `describe`, so we instantiate it lazily
  private var consumer: KafkaConsumer[String, String] = null

  def list() {
    adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
  }

  protected def describeGroup(group: String) {
    val consumerSummaries = adminClient.describeConsumerGroup(group)
    if (consumerSummaries.isEmpty)
      println(s"Consumer group `${group}` does not exist or is rebalancing.")
    else {
      val consumer = getConsumer()
	  // 打印描述头
      printDescribeHeader()
      consumerSummaries.foreach { consumerSummary =>
        val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
        val partitionOffsets = topicPartitions.flatMap { topicPartition =>
          Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
            topicPartition -> offsetAndMetadata.offset
          }
        }.toMap
        describeTopicPartition(group, topicPartitions, partitionOffsets.get,
          _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
      }
    }
  }

  // 获取HW的值
  protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
    val consumer = getConsumer()
    val topicPartition = new TopicPartition(topic, partition)
    consumer.assign(List(topicPartition).asJava)
    consumer.seekToEnd(topicPartition)
    val logEndOffset = consumer.position(topicPartition)
    LogEndOffsetResult.LogEndOffset(logEndOffset)
  }

 //省略中间一部分不重要的代码 
 
  private def createNewConsumer(): KafkaConsumer[String, String] = {
    val properties = new Properties()
    val deserializer = (new StringDeserializer).getClass.getName
    val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt))
   // 不自动提交offset很重要,否则会影响消费组正常的消费(丢数据) 
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
    if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))

    new KafkaConsumer(properties)
  }
}

2、CURRENT OFFSETLOG END OFFSET计算规则

其中关键部分:

  • CURRENT OFFSET计算规则:
代码语言:txt
AI代码解释
复制
consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
  topicPartition -> offsetAndMetadata.offset
}
  • HW(脚本显示的LOG END OFFSET)计算规则
代码语言:txt
AI代码解释
复制
    consumer.seekToEnd(topicPartition)
    val logEndOffset = consumer.position(topicPartition)

从jstack分析来看,是这一步卡住了。

3、验证分析

参考HW(脚本显示的LOG END OFFSET)计算规则,实现了一个简单的HW采集程序,分别采集异常消费组下2个topic的情况,来看看具体是哪一步卡住?卡住前后是否有相关日志或输出?

我们发现:

  • 该消费组下日常的topic是正常获取HW
image.png
image.png
  • 而离线补录的topic无法正常获取HW值,可能异常
image.png
image.png

进而发现补录的topic存在leader为-1的情况。

image.png
image.png

推测:因为离线补录的topic大部分是不会在线上生产数据,只会在某些特点场景下由平台侧往里面的一次性导入数据,所有这个古老的集群当时下掉若干个节点时并没有迁移这些一次性的topic,从而在使用kafka-consumer-groups.sh获取消费组产生异常。

四、总结

1、 这次问题分析走了一些弯路,但还是加强了对kafka-consumer-groups.sh实现原理的理解

2、topic leader为-1会造成各种各样奇怪的问题,哪怕是一些不重要的topic。

目前所有高版本的集群针对这类场景有完善的监控,而0.9.0.1这种古老集群还相对不完善,等最近裁撤迁移到新集群后会有很大改善。

更多内容可以关注我的公众号~

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Kafka快速入门(Kafka消费者)
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
鱼找水需要时间
2023/02/16
1.8K0
Kafka快速入门(Kafka消费者)
Kafka的消费者提交方式手动同步提交、和异步提交
  1)、自动提交,这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔五秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll方法来驱动的,在调用poll方法的时候,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。需要注意的是,这种方式可能会导致消息重复消费,假如,某个消费者poll消息后,应用正在处理消息,在3秒后kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。
别先生
2021/01/13
8K0
Kafka快速入门
LEO:Log End Offset,待写入消息的offset,即最后一条消息的offset+1
兜兜转转
2023/03/06
3670
「kafka」kafka-clients,java编写消费者客户端及原理剖析
每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。如下图所示:
源码之路
2020/09/04
2.2K0
「kafka」kafka-clients,java编写消费者客户端及原理剖析
进击消息中间件系列(六):Kafka 消费者Consumer
pull模式不足之处是如果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。
民工哥
2023/08/22
1.4K0
进击消息中间件系列(六):Kafka 消费者Consumer
【Kafka】核心API
虚拟化软件推荐 VM https://www.cnblogs.com/PrayzzZ/p/11330937.html VirtualBOX
瑞新
2020/12/07
1.3K0
【Kafka】核心API
Kafka详解
  Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
忧愁的chafry
2022/10/30
1.4K0
Kafka详解
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
操作步骤 Maven依赖 核心依赖 kafka-clients <dependency> <groupId>org.apache.kafkagroupId>
小小工匠
2021/08/17
5780
kafka系列--消费
     public String title;     public  ConsumerRecords<byte[], byte[]> records;     public KafkaConsumerSimple(String title, ConsumerRecords<byte[], byte[]> records) {         this.title = title;         this.records = records;     }     @Override     public void run() {         System.out.println("开始运行 " + title);         for (ConsumerRecord<byte[], byte[]> record : records) {             if(record!=null){                 String topic = record.topic();                 int partition = record.partition();                 long offset = record.offset();                 String msg = new String(record.value());                 String key=new String(record.key());                 //System.out.println(String.format(                         "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s],key:[%s]",                         title, topic, partition, offset, msg,key));             }         }         //System.out.println(String.format("Consumer: [%s] exiting ...", title));     }     public static void main(String[] args) {         Properties properties = new Properties();
Dlimeng
2023/06/29
1430
你说通过Kafka AdminClient获取Lag会有性能问题?尊嘟假嘟0.o
前阵子团队里出了个大故障,本质是因为其他语言实现的client有问题,非常频繁的请求大量元数据,而Kafka服务端这边也没有做什么限制,导致Kafka Broker宕了。
泊浮目
2024/09/25
1560
消息队列——Kafka基本使用及原理分析
Kafka也是一款消息队列中间件,与ActiveMQ和RabbitMQ不同的是,它不是基于JMS和AMQP规范开发的,而是提供了类似JMS的特性,同时Kafka比较重量级,天然支持集群分布式搭建以及数据分片备份,由Scala和Java编写,因其高性能和高吞吐量的特点被广泛用于大数据的传输场景。简单而言,Kafka就是一款适用于大数据场景下的消息队列。
夜勿语
2020/09/07
1.7K0
Kafka - 3.x offset位移不完全指北
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
小小工匠
2023/11/01
4410
Kafka - 3.x offset位移不完全指北
浅析Kafka的消费者和消费进度的案例研究
本文主要讨论Kafka组件中的消费者和其消费进度。我们将通过一个使用Scala语言实现的原型系统来学习。本文假设你知道Kafka的基本术语。
you
2018/05/30
2.5K0
Kafka源码阅读最最最简单的入门方法
从上面代码中可以看出,拉取的offset是从offsetsCache中获取。而在提交offset以及初始化group是会将对应的offset信息加入到该缓存中。
王知无-import_bigdata
2019/12/26
8680
kafka消费者组分区分配实战
问题引入:一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据
九转成圣
2024/11/24
2090
【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程
Kafka 里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。
江中散人_Jun
2024/02/18
2590
【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程
Kafka核心API——Consumer消费者
在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了。因此,本文将介绍Consumer API的使用,使用API从Kafka中消费消息,让应用成为一个消费者角色。
端碗吹水
2020/09/23
1.4K0
Kafka核心API——Consumer消费者
第二天:Kafka API操作
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
sowhat1412
2020/11/05
8530
第二天:Kafka  API操作
Kafka入门与实战
今天我们来聊一聊现在MQ中最火爆的Kafka吧。关于Kafka的内容还是比较多的。本篇大概15000左右字,大家根据自己的需求来看吧。本文的大纲如下图所示: 一、消息队列的作用是什么?
爪哇缪斯
2023/05/10
7730
Kafka入门与实战
消息中间件 Kafka
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。适用于需要可靠的数据传送的分布式环境。
京剧猫
2023/03/04
9190
消息中间件 Kafka
相关推荐
Kafka快速入门(Kafka消费者)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档