前言
五一假期某局点高速龙门架计费业务瘫痪,ISV定位之后反馈业务系统模块存在Flink写Kafka失败的问题。具体的异常日志如下:
Got error produce response with correlation id xx on topic-partition xx-x, retrying(x attempts left). Error: NOT_LEADER_FOR_PARTITION
本文从业务端producer日志和服务端Kafka日志入手,梳理对应Kafka源码,抽丝剥茧发现原生Bug,然后实验室环境复现验证,最终修改源码进行修复。具体问题就是Kafka多数据盘场景存在部分坏盘(部分扇区故障,磁盘只读)时,坏盘上的分区leader并不会自动切换到正常盘ISR副本上。排查发现截止目前最新Kafka4.x版本仍然存在该问题,强烈建议自建Kafka集群及时自行修复,确保Kafka集群的高可用性。笔者文章只发布在微信公众号:大数据从业者,其他均为转载,欢迎关注转发!
业务端生产者出现NOT_LEADER_FOR_PARTITION异常,首先服务端通过describe查看对应Topic分区leader情况未见异常,如图所示:
然后通过查看leader所在1005节点Kafka日志发现文件系统只读异常:java.io.IOException: Read-only file system,如图所示:
java.io.IOException: Read-only file system
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.createNewFile(File.java:1012)
at kafka.log.AbstractIndex.(AbstractIndex.scala:113)
at kafka.log.OffsetIndex.(OffsetIndex.scala:54)
at kafka.log.LazyOffsetIndex.get(OffsetIndex.scala:238)
at kafka.log.LogSegment.offsetIndex(LogSegment.scala:64)
at kafka.log.LogSegment$$anonfun$closeHandlers$1.apply$mcV$sp(LogSegment.scala:612)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:88)
at kafka.log.LogSegment.closeHandlers(LogSegment.scala:612)
at kafka.log.Log$$anonfun$closeHandlers$2.apply(Log.scala:803)
at kafka.log.Log$$anonfun$closeHandlers$2.apply(Log.scala:803)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.Log.closeHandlers(Log.scala:803)
at kafka.log.LogManager$$anonfun$handleLogDirFailure$5.apply(LogManager.scala:210)
at kafka.log.LogManager$$anonfun$handleLogDirFailure$5.apply(LogManager.scala:207)
at scala.collection.immutable.List.foreach(List.scala:392)
at kafka.log.LogManager.handleLogDirFailure(LogManager.scala:207)
at kafka.server.ReplicaManager.handleLogDirFailure(ReplicaManager.scala:1489)
at kafka.server.ReplicaManager$LogDirFailureHandler.doWork(ReplicaManager.scala:210)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)
查看系统内核日志,确实发现分区所在磁盘提示I/O error故障,起始时间与Kafka日志提示IOException时间基本吻合,如图所示:
至此,sdc磁盘故障确定无疑。但是,你不会以为让客户换个磁盘、然后重启对应Kafka节点就算结束了吧?(PS:这么操作确实能够恢复Kafka服务,但是只是规避,并未解决问题)深入思考下:Kafka属于分布式集群且Topic设置多副本机制,那么为什么单个Kafka节点单个磁盘故障时明明设置了多副本的Topic就不能用了呢?进一步描述就是:损坏磁盘上的分区的leader为什么没有failover切换到正常磁盘上的对应分区的ISR副本上?
继续回到上面Kafka日志,可以看出ReplicaManager模块已经识别出数据目录异常,也有动作,就是通过调用logManager.handleLogDirFailure(dir)方法将dir相关资源释放、设置为离线不再使用等等。但是,该方法底层有通过file.createNewFile()方法创建文件的动作,磁盘都只读故障,你还创建文件肯定会抛出IOException:
public boolean createNewFile() throws IOException
遗憾的是该异常没有在上层合适的位置捕获,导致后续需要通知Kafka Controller当前dir故障的事件没有传递。换句话说,正常流程中得通知Controller磁盘故障了、该切换坏盘上分区的leader了。但是,IOException没正确捕获直接中断了正常流程。从源码来看,故障盘的识别处理只会执行一次,一旦中断永远不会再次执行!详见源码路径:
kafka/core/src/main/scala/kafka/server/ReplicaManager.scala
Kafka KRaft模式对应源码如下所示:
Kafka ZooKeeper模式对应源码如下所示:
至此,问题分析完成!欢迎关注微信公众号大数据从业者。
客户现场不允许测试,笔者实验室环境进行模拟复现。通过修改源码,故意显式注释掉上述分析所提到的磁盘故障时通知Controller事件的动作,这里故意重复打印两次Stopped serving replicas in dir日志,如图所示:
创建测试Topic:名称test、1分区、2副本
[root@felixzh2 bin]# ./kafka-topics.sh --zookeeper localhost:2181/test --topic test --create --partitions 1 --replication-factor 2
[root@felixzh2 bin]# ./kafka-topics.sh --zookeeper localhost:2181/test –describe
如上图所示,通过describe查看分区leader为BrokerId=0的节点。然后,通过删除该节点存储Partition 0的磁盘模拟现场磁盘故障,查看BrokerId=0节点日志(PS:由于Topic中没有数据,所以不涉及资源释放也就没触发问题分析中描述的创建文件的代码。虽然不完全一致,但是效果类似):
从上图可以看出,Kafka已经识别出磁盘故障并开始进行处理,重复打印两次笔者修改的日志。如下图所示,describe发现test分区状态正常,leader还在BrokerId=0节点,未发生切换,符合预期(PS:没通知Controller,没人给切换啊):
笔者人为停止BrokerId=0的Kafka,describe查看Topic状态,如下所示:
leader发生切换,Isr副本缺BrokerId=0节点,也符合预期。这里也验证了上文提到的更换盘重启对应Kafka节点同样能恢复Topic可用的效果。这是为什么呢?以Kafka ZooKeeper模式为例简单说明,就是Kafka节点停止或者重启,其对应在zk上/brokers/ids的znode节点删除触发了通知Controller事件的动作,其与笔者注释掉的ReplicaManager感知磁盘故障触发通知Controller事件的逻辑并不是一回事。
问题分析清楚,解决方法很简单,通过try{}catch{}finally增加异常捕获,确保磁盘故障时候一定能够触发通知Controller事件的动作,以便于Controller及时触发leader切换。
Kafka ZooKeeper模式源码修改内容详见github:
https://github.com/felixzh2020/kafka/commit/e84aa9b0ae5d023427014b365a636ac0618657e7
Kafka KRaft模式源码修改内容详见github:
https://github.com/felixzh2020/kafka/commit/64e878734fc869ae77f7a6dc16c4144a7be95cec
至此,源码修改完成!欢迎关注微信公众号大数据从业者。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有