滴滴滴,就在本周遇见一个kafka下游消费失败,但是下游持久化失败,兜底任务不起作用。笔者对RabbitMQ了解和实战比较多。如果是RabbitMQ的话,我们一般会这样处理:
1.如若处理过程中出现异常,而没有回复ack 应答。通过后台就会看到有 unacked 的数据。如果积压的多会导致程序无法继续消费数据(数量和消费者的线程数有关)。
2.解决办法针对异常做处理,捕捉到后也回复ack/requeue/reject应答。
3.程序断开于rabbitmq的链接后,unacked的消息状态会重新变为ready 等待消费。重新发版,server应用连接rabbitmq 就会重新消费掉消息。
4.为什么会出现unack 消息, 还是手动模式代码没有处理好。
如果针对kafka怎么去做处理呢?
先说一下具体的场景,ofc通知vpos的时候,vpos远程接口调用失败,异常消息未落库,兜底没有起作用。
想到的解决办法有:
1.重新往Topic里面发消息,需要确认下游的所有Consumer Group都需要处理好幂等性。
2.人工智能提工单修数据。
3.失败的消息体,插入到持久化表,兜底任务重新保证一致性。
4.重设消费者组位移。
今天我们只说重设消费者组位移,像RabbitMQ这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker删除。
反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。
该怎么选择传统消息中间件和Kafka?如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。
怎么重设消费者位移?代码和脚本详细看下面的引用:
https://time.geekbang.org/column/article/116070 《Kaka核心技术与实战》
精进自省:不要陷入纠缠中,提升自己的价值。认清真相后,依然热爱它。