我有一个java Kafka消费者,在其中我正在批量获取ConsumerRecords进行处理。示例代码如下:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
DoSomeProcessing (record.value());
}
consumer.commitAsync();
}
private void DoSomeProcessing(String record) {
//make an external call to a system which can take random time for different requests or timeout in 5 seconds.
}我遇到的问题是,如果产生了较晚的记录,但前一条记录仍未超时,则如何提交或提交哪个偏移量。
假设我在一批中获得了2条记录,第一条消息的外部调用仍在等待,而第二次调用已完成。如果我等待5秒等待外部响应,Kafka消息的消耗在某些情况下会变得超级慢。如果我不等待第一个请求完成后再做另一个投票,我应该向Kafka承诺多少偏移量?如果我提交2,如果消费者崩溃,第一条消息将丢失,因为下一次最新提交的偏移量将是2。
发布于 2018-01-05 12:45:47
我认为您对问题的分析是正确的,答案可能就是您所怀疑的:您不能提交偏移量,直到每个小于或等于该偏移量的偏移量都被处理完。这就是Kafka的工作方式:它非常面向强排序。
解决方案是增加分区和使用者的数量,以便获得所需的并行性。从某些角度来看,这并不是很好--您需要更多的线程和资源--但至少您可以编写同步代码。
发布于 2018-01-05 18:18:14
你能做的就是建立一个错误管道。对于失败的消息,您将提交该消息并将其推送到错误队列,稍后再对其进行处理。
https://stackoverflow.com/questions/48106766
复制相似问题