正如问题How to manually control the offset commit with camel-kafka?中所问的,我希望使用camel-kafka手动提交偏移量。我的路线: .from(kafka:topic1).to(kafka:topic2)
.process(newManualCommitProcessor()) ,其中ManualCommitProce
这是一个基于Armando Ballaci提供的答案的"Where do zookeeper store Kafka cluster and related information?“的后续问题。现在很明显,消费者偏移量存储在Kafka集群中的一个名为__consumer_offsets的特殊主题中。很好,我只是想知道这些偏移量的检索是如何工作的。例如,
我有一个java Kafka消费者,在其中我正在批量获取ConsumerRecords进行处理。external call to a system which can take random time for different requests or timeout in 5 seconds.我遇到的问题是,如果产生了较晚的记录,但前一条记录仍未超时,则如何提交或提交哪个偏移量。假设我在一批中获得了2条记录,第一条消息的外部调用仍在等待,而第二