Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【kafka】__consumer_offsets部分分区异常导致消费不到数据问题排查

【kafka】__consumer_offsets部分分区异常导致消费不到数据问题排查

原创
作者头像
皮皮熊
修改于 2020-06-02 12:10:34
修改于 2020-06-02 12:10:34
6.2K00
代码可运行
举报
运行总次数:0
代码可运行

记一次kafka消费异常问题的排查 https://github.com/pierre94/kafka-notes

一、问题描述

问题描述

部分消费组无法通过broker(new-consumer)正常消费数据,更改消费组名后恢复正常。

group名(可能涉及业务信息,group名非真实名):

  • group1-打马赛克
  • group2-打马赛克

kafka版本: 0.9.0.1

二、简单分析

1、describe对应消费组

describe对应消费组时抛如下异常:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Error while executing consumer group command The group coordinator is not available.
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.

2、问题搜索

搜索到业界有类似问题,不过都没有解释清楚为什么出现这种问题,以及如何彻底解决(重启不算)!

  • http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-td4975.html

三、深入分析

日志是程序员的第一手分析资料。Kafka服务端因为现网有大量服务在运营,不适合开启debug日志,所以我们只能从客户端入手。

1、开启客户端debug日志

将客户端日志等级开成debug级别,发现持续循环地滚动如下日志:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
19:52:41.785 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker 43
19:52:41.788 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group metadata response ClientResponse(receivedTimeMs=1587642761788, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1b68ddbd, request=RequestSend(header={api_key=10,api_version=0,correlation_id=30,client_id=consumer-1}, body={group_id=30cab231-05ed-43ef-96aa-a3ca1564baa3}), createdTimeMs=1587642761785, sendTimeMs=1587642761785), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
19:52:41.875 TKD [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=31,client_id=consumer-1}, body={topics=[topic打马赛克]}), isInitiatedByNetworkClient, createdTimeMs=1587642761875, sendTimeMs=0) to node 43

我们大致可以看出循环在做着几件事情(先后不一定准确):

  • 从某个broker Issuing group metadata request
  • 获取Group metadata
  • 发起metadata request

我们聚焦到获取Group metadata的error关键字responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}},大致得出是kafka服务端没有给出coordinator的node结点信息。

2、服务端如何响应请求

请求对应的入口函数

首先我们需要查看api_key=10请求对应的服务端源码:

需要从kafka.server.KafkaApis中寻找对应的api接口函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  def handle(request: RequestChannel.Request) {
  ……
        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
  ……
    }
handleGroupCoordinatorRequest逻辑
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
    val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
    val responseHeader = new ResponseHeader(request.header.correlationId)

    if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
      val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
    } else {
      val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)

      // get metadata (and create the topic if necessary)
      val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
        // 第一个可能存在的问题:offsetsTopicMetadata的errCode不为空
      val responseBody = if (offsetsTopicMetadata.errorCode != Errors.NONE.code) {
        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
      } else {
        val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata
          .find(_.partitionId == partition)
          .flatMap {
            partitionMetadata => partitionMetadata.leader
          }
        // 第二个可能存在的问题:coordinatorEndpoint为空
        coordinatorEndpoint match {
          case Some(endpoint) =>
            new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
          case _ =>
            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
        }
      }

      trace("Sending consumer metadata %s for correlation id %d to client %s."
        .format(responseBody, request.header.correlationId, request.header.clientId))
      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
    }
  }

其中error_code=15对应的是Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code

从源码不难看出,导致Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code可能点有二:

  • 疑似问题点一:offsetsTopicMetadata的errCode不为空 offsetsTopicMetadata.errorCode != Errors.NONE.code offsetsTopicMetadata的errCode不为空,意味着整个__consumer_offsets元数据获取都有问题。但是现场只是部分group有问题,这里出问题的可能性不大。
  • 疑似问题点二:coordinatorEndpoint为空 val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata .find(_.partitionId == partition) .flatMap { partitionMetadata => partitionMetadata.leader } 从offsetsTopicMetadata获取到的元数据,过滤出coordinator.partitionFor(groupCoordinatorRequest.groupId)分区的leader。而coordinator.partitionFor(groupCoordinatorRequest.groupId)正是与group名相关!这里出问题的可能性极大!
partitionFor相关的逻辑:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

即取group名的正hashCode模groupMetadataTopicPartitionCount(即__consumer_offsets对应的分区数)。

注:可能涉及业务信息,group名非真实名。而结果是正式group名算出的结果。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> "group1-打马赛克".hashCode % 50
res2: Int = 43

scala> "group2-打马赛克".hashCode % 50
res3: Int = 43

我们发现2个异常的消费组,其partitionFor后的值均为43,我们初步判断分区可能与__consumer_offsets的43分区相关! 接下来我们就要看下offsetsTopicMetadata相关的逻辑,来确认异常。

offsetsTopicMetadata的逻辑
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)

getOrCreateGroupMetadataTopic -> metadataCache.getTopicMetadata -> getPartitionMetadata

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[PartitionMetadata]] = {
    cache.get(topic).map { partitions =>
      partitions.map { case (partitionId, partitionState) =>
        val topicPartition = TopicAndPartition(topic, partitionId)

        val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
        val maybeLeader = aliveBrokers.get(leaderAndIsr.leader)

        val replicas = partitionState.allReplicas
        val replicaInfo = getAliveEndpoints(replicas, protocol)

        maybeLeader match {
          case None =>
            debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
            new PartitionMetadata(partitionId, None, replicaInfo, Seq.empty[BrokerEndPoint],
              Errors.LEADER_NOT_AVAILABLE.code)

          case Some(leader) =>
            val isr = leaderAndIsr.isr
            val isrInfo = getAliveEndpoints(isr, protocol)

            if (replicaInfo.size < replicas.size) {
              debug("Error while fetching metadata for %s: replica information not available for following brokers %s"

  }

offsetsTopicMetadata即对于topic下所有leader、replicaInfo、isr正常分区的元数据信息,所以我们判断__consumer_offsets 43分区leader、replicaInfo、isr等可能存在异常,导致find(_.partitionId == partition)时找不到根据hashCode取模后对应的分区。

四、回到现网

1、__consumer_offsets分区信息验证

43分区果然存在leader异常的情况

2、问题复现

我们使用UUID批量生成消费组名,使其hashCode取模后为异常分区的分区号,再使用其进行消费时均出现消费异常的问题。

3、问题思考

  • 为什么__consumer_offsets部分分区会产生leader、replicaInfo、isr异常? 与网络抖动和一些集群操作可能有关,需要具体问题具体分析
  • 如何将__consumer_offsets异常分区恢复正常? 这里不详细介绍可以参考http://blog.itpub.net/31543630/viewspace-2212467/ 。

五、参考资料

  • Kafka new-consumer设计文档 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design
  • Kafka无法消费?!我的分布式消息服务Kafka却稳如泰山!http://blog.itpub.net/31543630/viewspace-2212467/
  • Problem with Kafka 0.9 Client http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-td4975.html
  • ErrorMapping https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
寻找协调器FindCoordinatorRequest请求流程
我们在分析消费者的时候, 有看到调用FindCoordinatorRequest的请求
石臻臻的杂货铺[同名公众号]
2022/09/28
3230
寻找协调器FindCoordinatorRequest请求流程
Kafka 消费者之 findCoordinator源码解析
1、首先,我们会给每个consume设置groupId,对于相同groupId且订阅相同topic的consume,会组成consumeGroup,如图一所示
石臻臻的杂货铺[同名公众号]
2022/01/10
9822
Kafka 消费者之 findCoordinator源码解析
记一次kafka客户端NOT_COORDINATOR_FOR_GROUP处理过程
转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/10602799.html
sanmutongzi
2020/03/05
1.7K0
记一次kafka客户端NOT_COORDINATOR_FOR_GROUP处理过程
Kafka学习(三)-------- Kafka核心之Consumer
了解了什么是kafka( https://www.cnblogs.com/tree1123/p/11226880.html)以后
大数据流动
2019/08/08
2K0
Kafka源码阅读最最最简单的入门方法
从上面代码中可以看出,拉取的offset是从offsetsCache中获取。而在提交offset以及初始化group是会将对应的offset信息加入到该缓存中。
王知无-import_bigdata
2019/12/26
8710
Kafka的消息是如何被消费的?Kafka源码分析-汇总
Kafka的消息消费是以消费的group为单位; 同属一个group中的多个consumer分别消费topic的不同partition; 同组内consumer的变化, partition变化, coordinator的变化都会引发balance; 消费的offset的提交 Kafka wiki: Kafka Detailed Consumer Coordinator Design 和 Kafka Client-side Assignment Proposal ---- GroupMetadata类 所
扫帚的影子
2018/09/05
1.4K0
Kafka的消息是如何被消费的?Kafka源码分析-汇总
ReplicaManager源码解析2-LeaderAndIsr 请求响应
其中最主要的操作调用ReplicaManager.becomeLeaderOrFollower来初始化Partition
扫帚的影子
2018/09/05
8570
ReplicaManager源码解析2-LeaderAndIsr 请求响应
跟我学Kafka之Controller控制器详解(一)
Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行类似于重分配Partition之类的管理任务。如果当前的Controller失败,会从其他正常的Broker中重新选举Controller。
小程故事多
2018/08/22
8940
跟我学Kafka之Controller控制器详解(一)
Kafka的Request和Response
每个Request和Response都由RequestHeader(ResponseHeader) + 具体的消费体构成;
扫帚的影子
2018/09/05
1K0
Kafka的Request和Response
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ 致力于构建下一代云原生 Kafka 系统,解决过去 Kafka 的诸多痛点问题,引领 Kafka 走向云原生时代。作为国内 Kafka 生态的忠实拥护者,我们将持续为 Kafka 技术爱好者带来优质的 Kafka 技术内容分享,欢迎关注我们。今天给大家带来的是 Kafka Consumer 与 Kafka Broker 之间的交互机制解析,并简要介绍其背后的主要工作机制,参考的 Kafka 源码版本为 3.4。
用户10807116
2024/05/27
1.3K0
kafka知识点--offset管理和Consumer Rebalance
consumer group是kafka提供的可扩展且具有容错性的消费者机制。组内可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。 consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程 group.id是一个字符串,唯一标识一个consumer group consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
哲洛不闹
2020/03/11
4.9K0
首页 归档 分类 标签 作者 kafka原理总结
分区策略决定 producer 将消息怎么分发到 partition 中, 分区策略不合适可能导致数据倾斜, 有些时候我们需要实现顺序消息, 也需要将同一业务的消息都发送到同一个 partition 上。生产端将消息发送给 broker 之前主要经过拦截、序列化、分区(Partitioner)几个步骤。分区器主要读取 partition 配置(生产端配置partitioner.class, 默认值是 DefaultPartitioner)
leobhao
2023/03/08
4820
首页  归档  分类  标签  作者     kafka原理总结
Kafka 的稳定性
多分区原子写入: 事务能够保证Kafka topic下每个分区的原⼦写⼊。事务中所有的消息都将被成功写⼊或者丢弃。 ⾸先,我们来考虑⼀下原⼦读取-处理-写⼊周期是什么意思。简⽽⾔之,这意味着如果某个应⽤程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进⾏了⼀些处理(如B = F(A)),之后将消息B写⼊topic tp1,则只有当消息A和B被认为被成功地消费并⼀起发布,或者完全不发布时,整个读取过程写⼊操作是原⼦的。 现在,只有当消息A的偏移量X被标记为已消费,消息A才从topic tp0消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写⼊⼀个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。 由于offset commit只是对Kafka topic的另⼀次写⼊,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原⼦写⼊也启⽤原⼦读取-处理-写⼊循环:提交偏移量X到offset topic和消息B到tp1的写⼊将是单个事务的⼀部分,所以整个步骤都是原⼦的。
用户7353950
2022/06/23
1.3K0
Kafka 的稳定性
你说通过Kafka AdminClient获取Lag会有性能问题?尊嘟假嘟0.o
前阵子团队里出了个大故障,本质是因为其他语言实现的client有问题,非常频繁的请求大量元数据,而Kafka服务端这边也没有做什么限制,导致Kafka Broker宕了。
泊浮目
2024/09/25
1580
记一次Kafka集群的故障恢复Kafka源码分析-汇总
Kafka 集群部署环境 kafka 集群所用版本 0.9.0.1 集群部署了实时监控: 通过实时写入数据来监控集群的可用性, 延迟等; ---- 集群故障发生 集群的实时监控发出一条写入数据失败的报警, 然后马上又收到了恢复的报警, 这个报警当时没有重要,没有去到对应的服务器上去看下log, 恶梦的开始啊~~~ 很快多个业务反馈Topic无法写入, 运维人员介入 故障解决 运维人员首先查看kafka broker日志, 发现大量如下的日志: [2017-10-12 16:52:38,141] ER
扫帚的影子
2018/09/05
1.9K0
Kafka 消费者旧版低级 API
Kafka 消费者总共有 3 种 API,新版 API、旧版高级 API、旧版低级 API,新版 API 是在 kafka 0.9 版本后增加的,推荐使用新版 API,但由于旧版低级 API 可以对消息进行更加灵活的控制,所有在实际开发中使用的也较多,本文讨论消费者旧版低级 API 的基本使用。
CoderJed
2018/09/13
1.6K0
Kafka-Broker的基本模块
1.SocketServer SocketServer作为Broker对外提供Socket服务的模块,主要用于接收socket连接的请求,然后产生相应为之服务的SocketChannel对象。
程序狗
2021/12/17
5740
Kafka全面认知
最早设计的目的是作为LinkedIn的活动流和运营数据的处理管道。这些数据主要是用来对用户做用户画像分析以及服务器性能数据的一些监控。
花落花相惜
2021/11/22
5150
Kafka集群Metadata管理Kafka源码分析-汇总
可以看到是调用了ReplicaManager.maybeUpdateMetadataCache方法, 里面又会调用到MetadataCache.updateCache方法
扫帚的影子
2018/09/05
1.3K0
Kafka Consumer 的 Rebalance 机制
如上图所示,Consumer 使用 Consumer Group 名称标记自己,并且发布到主题的每条记录都会传递到每个订阅消费者组中的一个 Consumer 实例。 Consumer 实例可以在单独的进程中或在单独的机器上。
haifeiWu
2020/02/10
2.5K0
相关推荐
寻找协调器FindCoordinatorRequest请求流程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验