前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 部分问题处理记录

kafka 部分问题处理记录

作者头像
sanmutongzi
发布2020-03-04 15:38:11
1.4K0
发布2020-03-04 15:38:11
举报
文章被收录于专栏:stream process

转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7600561.html

一 broker启动后ReplicaFetcherThread OOM

版本:0.8.2.2

错误现象,server启动日志:

WARN [ReplicaFetcherThread-1-21], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: ReplicaFetcherThread-1-21; ReplicaId: 20; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic_a,28] -> PartitionFetchInfo(13372825,400000000),[topic_b-live,19] -> PartitionFetchInfo(2382784277,400000000),[topic_b-user,2] -> PartitionFetchInfo(2230117428,400000000),[topic_b_like,20] -> PartitionFetchInfo(0,400000000),[topic_b-group,32] -> PartitionFetchInfo(0,400000000),[topic_b_live,1] -> PartitionFetchInfo(1352237,400000000),[topic_b-log,1] -> PartitionFetchInfo(12598395616,400000000),[topic_b_microvideo,18] -> PartitionFetchInfo(0,400000000),[topic_b-group,92] -> PartitionFetchInfo(0,400000000),[bl_generic_event_detail,10] -> PartitionFetchInfo(27302121,400000000),[__consumer_offsets,14] -> PartitionFetchInfo(0,400000000),[__consumer_offsets,18] -> PartitionFetchInfo(0,400000000),[topic_b_group,12] -> PartitionFetchInfo(7323,400000000),[topic_b_feed,19] -> PartitionFetchInfo(232378,400000000),[topic_b_nearby,26] -> PartitionFetchInfo(0,400000000),[topic_b_user,20] -> PartitionFetchInfo(265,400000000),[topic_b_banners,2] -> PartitionFetchInfo(0,400000000). Possible cause: java.lang.OutOfMemoryError: Java heap space (kafka.server.ReplicaFetcherThread) [2017-08-10 11:09:55,310] WARN [ReplicaFetcherThread-3-23], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 5; ClientId: ReplicaFetcherThread-3-23; ReplicaId: 20; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic_b_microvideo,8] -> PartitionFetchInfo(37116,400000000),[topic_b-common,12] -> PartitionFetchInfo(5378147776,400000000),[topic_b_user,2] -> PartitionFetchInfo(76267,400000000),[topic_b-group,82] -> PartitionFetchInfo(0,400000000),[bl_generic_event_detail,24] -> PartitionFetchInfo(27457189,400000000),[topic_b-group,22] -> PartitionFetchInfo(0,400000000),[topic_b_like,10] -> PartitionFetchInfo(0,400000000),[topic_b-nearby,28] -> PartitionFetchInfo(471861516,400000000),[topic_b_nearby,16] -> PartitionFetchInfo(28,400000000),[topic_b_feed,9] -> PartitionFetchInfo(79605,400000000),[topic_a,10] -> PartitionFetchInfo(13365304,400000000),[topic_b-live,1] -> PartitionFetchInfo(2381944301,400000000),[topic_b_log,13] -> PartitionFetchInfo(366613,400000000). Possible cause: java.nio.channels.ClosedSelectorException (kafka.server.ReplicaFetcherThread) [2017-08-10 11:09:55,310] INFO Reconnect due to socket error: java.nio.channels.ClosedChannelException (kafka.consumer.SimpleConsumer) [2017-08-10 11:09:55,671] ERROR OOME with size 491278751 (kafka.network.BoundedByteBufferReceive) java.lang.OutOfMemoryError: Java heap space

原因 :replica.fetch.max.bytes参数设置的太大,在集群正常运行的情况下不会有问题,一旦某台broker挂掉重启后需要追的分区数过多,每个需要追数的partition都是申请一块replica.fetch.max.bytes大小的内存造成OOM。

解决办法:减少replica.fetch.max.bytes的值

二 重启后分区完全无法自平衡leader

版本:0.8.2.2

错误现象:1 broker挂掉重启后启动正常,追数据正常,所有副本进入isr后controller不会自动指定为leader。所有服务端日志正常,crontroller不打日志

2 jstack crontroller 线程栈发现delete-topics-thread一直block,另有Controller-xx-to-broker-yy-send-thread block在deleteTopicStopReplicaCallback位置

"Controller-29-to-broker-26-send-thread" prio=10 tid=0x00007f6520041000 nid=0x4d9d waiting on condition [0x00007f649e90e000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000005d3831d38> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:871) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1201) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) at kafka.utils.Utils$.inLock(Utils.scala:533) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:371) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:338) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:338) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:229) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:229) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:160) - locked <0x000000065d279300> (a java.lang.Object) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"delete-topics-thread-29" prio=10 tid=0x00007f652006c000 nid=0x4da3 waiting on condition [0x00007f649dbdd000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000065d2800e0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:282) at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57) - locked <0x000000065d279858> (a java.lang.Object) at kafka.controller.KafkaController.sendRequest(KafkaController.scala:668) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$9.apply(ControllerChannelManager.scala:312) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3$$anonfun$apply$9.apply(ControllerChannelManager.scala:309) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:309) at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$3.apply(ControllerChannelManager.scala:302) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:302) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:115) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:337) at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:327) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:327) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:360) at kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:306) at kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:305) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:305) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:424) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:396) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:396) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390) at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:390) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:390) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

原因:kafka版本bug,原因参考https://issues.apache.org/jira/browse/KAFKA-2046和https://issues.apache.org/jira/browse/KAFKA-2122,controller内部事件处理共用一个与controller.message.queue.size大小相关的队列,如果这个参数配置为一个过小的值的话,会导致controller内部线程互相block。

解决办法:调大controller.message.queue.size或者注销掉(默认值为Int.MaxValue)然后滚动重启所有状态不正常的controller,直到controller落到一个配置正常的broker上

三 WARN [Controller-60-to-broker-60-send-thread], Controller 60's connection to broker kafka60:9092 (id: 60 rack: null) was unsuccessful (kafka.controller.RequestSendThread)

版本 0.10.2.0,1.0.0

现象:

kafka服务端配置好kerberos后,controller持续报无法连接到broker的错误(包括连接自身实例),大概错误如下

[2018-01-25 17:48:41,864] WARN [Controller-60-to-broker-60-send-thread], Controller 60's connection to broker kafka60:9092 (id: 60 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to kafka60:9092 (id: 60 rack: null) failed at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2018-01-25 17:48:41,970] WARN [Controller-60-to-broker-60-send-thread], Controller 60's connection to broker kafka60:9092 (id: 60 rack: null) was unsuccessful (kafka.controller.RequestSendThread)

......

原因: 打开kafka-authorizer.log的DEBUG日志会看到具体错误,这个是由于线上jre的环境缺少kerberos认证的算法库导致的,更新jre相关类库即可

[2018-01-25 17:55:31,155] DEBUG Connection with /host disconnected (org.apache.kafka.common.network.Selector) java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)] at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:250) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:71) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at kafka.network.Processor.poll(SocketServer.scala:494) at kafka.network.Processor.run(SocketServer.scala:432) at java.lang.Thread.run(Thread.java:748) Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled)] at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:199) at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:235) ... 6 more Caused by: GSSException: Failure unspecified at GSS-API level (Mechanism level: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled) at sun.security.jgss.krb5.Krb5Context.acceptSecContext(Krb5Context.java:856) at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:342) at sun.security.jgss.GSSContextImpl.acceptSecContext(GSSContextImpl.java:285) at com.sun.security.sasl.gsskerb.GssKrb5Server.evaluateResponse(GssKrb5Server.java:167) ... 7 more Caused by: KrbException: Encryption type AES256 CTS mode with HMAC SHA1-96 is not supported/enabled at sun.security.krb5.EncryptionKey.findKey(EncryptionKey.java:522) at sun.security.krb5.KrbApReq.authenticate(KrbApReq.java:273) at sun.security.krb5.KrbApReq.<init>(KrbApReq.java:149) at sun.security.jgss.krb5.InitSecContextToken.<init>(InitSecContextToken.java:108) at sun.security.jgss.krb5.Krb5Context.acceptSecContext(Krb5Context.java:829) ... 10 more

四 WARN Map failed (kafka.utils.CoreUtils$) java.io.IOException: Map failed

WARN Map failed (kafka.utils.CoreUtils$) java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940) at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:114) at kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:104) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) at kafka.log.AbstractIndex.resize(AbstractIndex.scala:104) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:163) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:163) at kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:163) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217) at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:162) at kafka.log.AbstractIndex.close(AbstractIndex.scala:174) at kafka.log.LogSegment$$anonfun$close$2.apply$mcV$sp(LogSegment.scala:471) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:82) at kafka.utils.Logging$class.swallowWarn(Logging.scala:96) at kafka.utils.CoreUtils$.swallowWarn(CoreUtils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:98) at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:46) at kafka.log.LogSegment.close(LogSegment.scala:471) at kafka.log.Log$$anonfun$close$1$$anonfun$apply$mcV$sp$1.apply(Log.scala:573) at kafka.log.Log$$anonfun$close$1$$anonfun$apply$mcV$sp$1.apply(Log.scala:573) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log$$anonfun$close$1.apply$mcV$sp(Log.scala:573) at kafka.log.Log$$anonfun$close$1.apply(Log.scala:568) at kafka.log.Log$$anonfun$close$1.apply(Log.scala:568) at kafka.log.Log.maybeHandleIOException(Log.scala:1669) at kafka.log.Log.close(Log.scala:568) at kafka.log.LogManager$$anonfun$shutdown$4$$anonfun$10$$anonfun$apply$3.apply$mcV$sp(LogManager.scala:403) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.OutOfMemoryError: Map failed at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937) ... 35 more

解决方案:

确保jvm是64位的以后

echo "vm.max_map_count=262144" >> /etc/sysctl.conf sysctl -p

参考 https://jira.apache.org/jira/browse/KAFKA-5962

kafka相关patch

源码版本 1.0

(1)KAFKA-6185 Selector memory leak with high likelihood of OOM in case of down conversion

patch 4193

(2)KAFKA-6362 auto commit not work since coordinatorUnknown() is always true.

patch 4349,4326,4641

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-09-27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档