上面的这些实现都在 KafkaStateActor.scala 这个文件里.
kafka-manager.png
props.put("group.id", "KafkaManagerOffsetCache")
props.put("bootstrap.servers", bootstrapBrokerList.list.map(bi => s"${bi.host}:${bi.port}").mkString(","))
props.put("exclude.internal.topics", "false")
props.put("enable.auto.commit", "false")
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "KafkaManagerOffsetCache")
这句说明不管启动了几个kafka manager, 消费"__consumer_offsets"都使用同一个group.
解决方案: group.id从配置文件中读取,每个kafka manager使用不同的group id;
case GroupMetadataKey(version, key) =>
val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()))
value.allMemberMetadata.foreach {
mm =>
mm.assignment.foreach {
case (topic, part) =>
groupTopicPartitionMemberMap += (key, topic, part) -> mm
}
}
}
这里的record.value可能为空, 此时应作清理工作:
if (null != record &&
null != record.value()) {
val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()))
value.allMemberMetadata.foreach {
mm =>
mm.assignment.foreach {
case (topic, part) =>
groupTopicPartitionMemberMap += (key, topic, part) -> mm
}
}
} else {
groupTopicPartitionMemberMap.foreach {
case ((group, topic, part), mmd) =>
if (group == key) {
var tmp = mmd
tmp.memberId = ""
tmp.clientHost = ""
groupTopicPartitionMemberMap += (key, topic, part) -> tmp
}
}
}
这里提供一种解决方案: 这个超时是Actor在执行异步请求时一直等不到返回结果造成的, 主要是前面讲过的"获取Topic的各partition的last offset的Future"没有返回结果,这些Future是通过Await.ready来阻塞拿到result的, 然而在kafka manager中这个Await.ready没有给timeout, 是一直等待, 那咱们就给个timeout好了, 代码在ActorModel.scala中, 有好几处Await.ready的调用.
找到根源: 再也不用定时重启, 提了一个pull request到官方:Use a separate thread to get the topic offsets to fixed bug 'Yikes! Ask timed out...', 主要就是不再使用 Future[PartitionOffsetCapture]
来获取topic offset, 因为这个会产生大量的Future
, 进而会产生大量的task提交到ThreadExcutor, 其实只需要启动一个单独的线程来作这件事就好了.
case GroupMetadataKey(version, key) =>
if (null != record &&
null != record.value()) {
val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()))
var topicSet:Set[String] = Set()
value.allMemberMetadata.foreach {
mm =>
mm.assignment.foreach {
case (topic, part) =>
topicSet += topic
groupTopicPartitionMemberMap += (key, topic, part) -> mm
}
}
groupTopicPartitionMemberMap.foreach {
case ((group, topic, part), mmd) =>
if (group == key &&
!topicSet.contains(topic)) {
var tmp = mmd
tmp.memberId = ""
tmp.clientHost = ""
groupTopicPartitionMemberMap += (key, topic, part) -> tmp
}
}
} else {
groupTopicPartitionMemberMap.foreach {
case ((group, topic, part), mmd) =>
if (group == key) {
var tmp = mmd
tmp.memberId = ""
tmp.clientHost = ""
groupTopicPartitionMemberMap += (key, topic, part) -> tmp
}
}
}
}