记一次kafka消费异常问题的排查 https://github.com/pierre94/kafka-notes
部分消费组无法通过broker(new-consumer)正常消费数据,更改消费组名后恢复正常。
group名(可能涉及业务信息,group名非真实名):
group1-打马赛克
group2-打马赛克
kafka版本: 0.9.0.1
describe对应消费组时抛如下异常:
Error while executing consumer group command The group coordinator is not available.
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
搜索到业界有类似问题,不过都没有解释清楚为什么出现这种问题,以及如何彻底解决(重启不算)!
日志是程序员的第一手分析资料。Kafka服务端因为现网有大量服务在运营,不适合开启debug日志,所以我们只能从客户端入手。
将客户端日志等级开成debug级别,发现持续循环地滚动如下日志:
19:52:41.785 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker 43
19:52:41.788 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group metadata response ClientResponse(receivedTimeMs=1587642761788, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1b68ddbd, request=RequestSend(header={api_key=10,api_version=0,correlation_id=30,client_id=consumer-1}, body={group_id=30cab231-05ed-43ef-96aa-a3ca1564baa3}), createdTimeMs=1587642761785, sendTimeMs=1587642761785), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
19:52:41.875 TKD [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=31,client_id=consumer-1}, body={topics=[topic打马赛克]}), isInitiatedByNetworkClient, createdTimeMs=1587642761875, sendTimeMs=0) to node 43
我们大致可以看出循环在做着几件事情(先后不一定准确):
Issuing group metadata request
Group metadata
metadata request
我们聚焦到获取Group metadata
的error关键字responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}
,大致得出是kafka服务端没有给出coordinator的node结点信息。
首先我们需要查看api_key=10
请求对应的服务端源码:
需要从kafka.server.KafkaApis
中寻找对应的api接口函数
def handle(request: RequestChannel.Request) {
……
case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
……
}
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
val responseHeader = new ResponseHeader(request.header.correlationId)
if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
// 第一个可能存在的问题:offsetsTopicMetadata的errCode不为空
val responseBody = if (offsetsTopicMetadata.errorCode != Errors.NONE.code) {
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
} else {
val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata
.find(_.partitionId == partition)
.flatMap {
partitionMetadata => partitionMetadata.leader
}
// 第二个可能存在的问题:coordinatorEndpoint为空
coordinatorEndpoint match {
case Some(endpoint) =>
new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
case _ =>
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
}
}
trace("Sending consumer metadata %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
}
其中error_code=15
对应的是Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
从源码不难看出,导致Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
可能点有二:
__consumer_offsets
元数据获取都有问题。但是现场只是部分group有问题,这里出问题的可能性不大。offsetsTopicMetadata
获取到的元数据,过滤出coordinator.partitionFor(groupCoordinatorRequest.groupId)
分区的leader。而coordinator.partitionFor(groupCoordinatorRequest.groupId)
正是与group名相关!这里出问题的可能性极大! def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
即取group名的正hashCode模groupMetadataTopicPartitionCount
(即__consumer_offsets
对应的分区数)。
注:可能涉及业务信息,group名非真实名。而结果是正式group名算出的结果。
scala> "group1-打马赛克".hashCode % 50
res2: Int = 43
scala> "group2-打马赛克".hashCode % 50
res3: Int = 43
我们发现2个异常的消费组,其partitionFor
后的值均为43,我们初步判断分区可能与__consumer_offsets
的43分区相关! 接下来我们就要看下offsetsTopicMetadata
相关的逻辑,来确认异常。
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
getOrCreateGroupMetadataTopic
-> metadataCache.getTopicMetadata
-> getPartitionMetadata
private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[PartitionMetadata]] = {
cache.get(topic).map { partitions =>
partitions.map { case (partitionId, partitionState) =>
val topicPartition = TopicAndPartition(topic, partitionId)
val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
val maybeLeader = aliveBrokers.get(leaderAndIsr.leader)
val replicas = partitionState.allReplicas
val replicaInfo = getAliveEndpoints(replicas, protocol)
maybeLeader match {
case None =>
debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
new PartitionMetadata(partitionId, None, replicaInfo, Seq.empty[BrokerEndPoint],
Errors.LEADER_NOT_AVAILABLE.code)
case Some(leader) =>
val isr = leaderAndIsr.isr
val isrInfo = getAliveEndpoints(isr, protocol)
if (replicaInfo.size < replicas.size) {
debug("Error while fetching metadata for %s: replica information not available for following brokers %s"
}
offsetsTopicMetadata即对于topic下所有leader、replicaInfo、isr正常分区的元数据信息,所以我们判断__consumer_offsets
43分区leader、replicaInfo、isr等可能存在异常,导致find(_.partitionId == partition)
时找不到根据hashCode取模后对应的分区。
__consumer_offsets
分区信息验证43分区果然存在leader异常的情况
我们使用UUID批量生成消费组名,使其hashCode取模后为异常分区的分区号,再使用其进行消费时均出现消费异常的问题。
__consumer_offsets
部分分区会产生leader、replicaInfo、isr异常? 与网络抖动和一些集群操作可能有关,需要具体问题具体分析__consumer_offsets
异常分区恢复正常?
这里不详细介绍可以参考http://blog.itpub.net/31543630/viewspace-2212467/ 。原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有