本文只聚焦于Kafka系统的消息丢失,如果是生产环境出现数据丢失,排查时要先从链路上分段定位,缩小问题范围。
如果对Kafka不了解的话,可以先看这篇博客《一文快速了解Kafka》。
生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。
解决方法:
不能认为在调用send方法发送消息之后消息消息发送成功了。为了确定消息是发送成功,需要判断消息发送的结果。但要注意的是Kafka生产者(Producer) 使用send方法发送消息实是异步的操作,虽然可以通过get()方法获取调用结果,但降低业务服务的吞吐量。优化的方式是改为回调函数的形式。
此外,对于一致性要求不高的业务场景,可以考虑Producer端设置retries
(重试次数)设置一个比较合理的值,一般是3。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议将重试间隔设置长一些,因为间隔时间太小,可能一次网络波动的时间重试全部结束了。
自动提交开启会存在这样的问题:当消费者poll到这个消息,还没进行真正消费的时候,offset被自动提交的同时消费者挂掉了。
解决办法:
关闭自动提交offset(即:enable.auto.commit
为false),每次在真正消费完消息之后,手动提交offset。
但这样还是会存在消费者刚消费完消息,还没提交offset,结果宕机了,那么这个消息理论上就会被消费两次,因此消费端幂等性是需要保证。可以查看博客《一文理解如何实现接口的幂等性》,有这种问题对应的解决方案
假如leader副本所在的broker突然挂掉,那么就要从follower副本重新选出一个leader,但是leader的数据还有一些没有被follower副本的同步的话,就会造成消息丢失。
解决方法:
为了减少Kafka系统内丢失消息的情况,Kafka需要配置如下几个参数:
acks
=all。acks
的默认值为1,代表消息被leader副本接收之后就算被成功发送。当配置acks
=all代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。(副本只是将消息存储在PageCache上的,定期flush到磁盘上的,如果出现断电或者机器故障等,PageCache上的数据就丢失了。但设置设置了acks
=all,出现多个副本同时挂掉的概率比Leader挂掉的概率就小很多)replication.factor
>=3。为了保证leader副本能有follower 副本能同步消息,一般会设置replication.factor
>=3。这样就可以保证每个分区(partition)至少有3个副本。虽然造成了数据冗余,但是带来了数据的安全性。min.insync.replicas
>1。一般情况下需要设置min.insync.replicas
>1,这样配置代表消息至少要被写入到2个副本才算是被成功发送(默认值为1)。在实际生产中应尽量避免min.insync.replicas
值为1,此外,为了保证整个Kafka服务的高可用性,你需要确保replication.factor
>min.insync.replicas
,否则有一个副本挂掉,整个分区就无法正常工作了。推荐设置成replication.factor
=min.insync.replicas
+1。unclean.leader.election.enable
=false。即不允许Unclean leader选举。retries
。配合acks
=all,这样可以保证leader挂掉之后,Producer会重新发送消息。Unclean leader选举:Kafka把不在ISR列表中的存活副本称为“非同步副本”,这些副本中的消息远远落后于leader,如果选举这种副本作为leader的话就可能造成数据丢失。Kafka broker端提供了一个参数unclean.leader.election.enable,用于控制是否允许非同步副本参与leader选举;如果开启,则当ISR为空时就会从这些副本中选举新的leader,这个过程称为Unclean leader选举。
单条数据的长度超过限制会丢失数据,报kafka.common.MessageSizeTooLargeException异常,导致生产者消息积压,内存上升。
解决方法:
修改Kafka Broker的配置,修改单条消息的最大长度、单条消息的最大长度等参数配置。