所谓的消息幂等性就是如何保证消息只消费一次不重复消费。这需要从Kafka的多个角度去回答该问题一是要包含Kafka自身的机制,还需要考虑客户端自己的重复处理。
Kafka的Consumer(消费者)offset还没来得及提交导致重复消费。所以,消费者可以手动提交offet来控制消息的消费情况。通过手动提交offset,消费者可以跟踪自己已经消费的消息,确保不会重复消费。
另外,消费者如何保证不重复消费消息的关键在于消费者做控制,因为MQ有可能无法保证不重复发送消息,所以在消费者端也应该控制:即使MQ重复发送了消息,消费者拿到消息之后,也要判断是否已经消费过该条消息。所以根据实际业务场景,有以下几种实现方式:
★同时,在Kafka中每个消费者都必须加入至少一个消费者组(Consumer Group),同一个消费者组内的消费者可以共享消费者的负载。因此,如果一个消息被消费者组内的其中一个消费者消费了,那么其它消费者就不用在接收到这个消息了。
另外,客户端还可以自己做一些**幂等机制**,防止消息的重复消费。
在Kafka中内部可以为每条消息生成一个全局唯一、与业务无关的消息ID,当MQ接收到消息时,会先根据ID判断消息是否重复发送,Kafka再决定是否接收该消息。
Kafka内部提供了Exactly-once消费语义。简单理解其实就是引入事务,消费者使用事务来保证消息的消费和offset提交是原子的,而生产者可以使用事务来保证消息的生产和offset提交是原子的。Exactly-once消费语义则解决了重复问题。但是需要更复杂的设置和配置
在Kafka中,有三种比较常见的消息传递语义:
★如何配置:
consumer.commitSync()
提交offset这种语义有可能会对数据重复处理,因为该消费语义要保证消费者至少消费一次。在at-least-once语义中,在消费数据之后,手动调用函数consumer.commitSync()异步提交offset,有可能处理多次的场景是消费者的消息处理完并输出到结果库,但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息,所以至少会处理一次
这种语义比较适用于实时数据处理或消费者不能容忍数据丢失的场景,比如金融交易或者电信指令。
★如何配置:
enable.auto.commit
为trueauto.commit.interval.ms
设置一个较低的时间范围由于上面的配置,此时的Kafka会有一个独立线程负责按照指定间隔提交offset。
消费者的offset已经提交,但是消息还在处理中(没有处理完),这个时候程序挂了,导致数据没有被成功处理,再重启的时候会从上次提交的offset处处理,导致上次没有被成功处理的消息丢失了。
★如何配置:
enable.auto.commit
设置为false,禁用自动提交consumer.seek(topicParttion, offset)
来指定offset这种语义可以保证数据只被消费处理一次。同时保证消息的顺序性。是以原子事务的方式保存offset和处理的消息结果。数据真正处理成功的时候才会保存offset信息。
在Kafka 0.11 版本之前,实现exactly-once语义需要一些特殊的配置和设置。但是在Kafka 0.11 版本之后,Kafka提供了原生的exactly-once支持,使得实现exactly-once语义变得更加简单和可靠
我们都知道Kafka的消息是存储在指定的Topic中的某个Partition中的,并且一个Topic是可以有多个Partition的,同一个Partition的消息是有序的,但是如果是不同的Partition或者不同的Topic的消息那就是无序的了。
假设需要做一个MySQL binlog的同步系统,在MySQL中有一个针对某条数据增删改的操作,对应出来的增删改三条binlog,接着这三条binlog需要发送到Kafka(MQ)中,消费者消费出来一次执行,此时就需要保证消息的一致性,否则数据就会出现问题。
因为当生产者向某个Partition发送消息时,消息会被追加到该Partition中的日志文件中(log),并且会被分配一个唯一的offset,文件的读写是有顺序的。而消费者在该Partition消费消息时,会从该Partition的最早offset开始逐个读取消息 ,从而保证了消息的顺序性。
那么想要实现消息的顺序性消费,可以从以下角度参考:
★比如:生产者可以在生产写入数据的时候可以指定一个Key,比如指定某个订单id作为Key,这个订单相关的操作就会被分发到一个Partition中去。
消费者内部利用了多个线程并发处理,则可能会出现顺序不一致的问题。
如图所示:
那么应该如何解决消费者端多线程并发处理消息导致消息顺序不一致的情况呢?
大致的思路可以按照hash
算法进行hash分发。因为相同的订单Key的数据会分发到一个内存queue
里面去。
如图:
在Kafka中,当我们向其发送消息的时候,如果Key为null,那么Kafka会采用默认的Round-robin
策略,也就是轮转。具体实现类是:DefaultPartitioner
。所以如果想要指定发送消息到某个Partition中,可以参考下面的方式:
★发送消息的时候执行Partition,具体 可以在ProducerRecord中指定Partition
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 指定要发送消息的主题
String topic = "Paidaxing_TOPIC";
// 要发送的消息内容
String message = "Hello Paidaxing!";
// 要发送消息的分区
int partition = 0;
// 创建包含分区信息的ProducerRecord
ProducerRecord<String, String> record = new ProducerRecord<>(topic, partition, null, message);
// 发送消息
producer.send(record);
// 关闭Kafka生产者
producer.close();
}
}
★在没有指定Partition(null)时,如果有Key,Kafka会根据Key做Hash计算出一个Partition编号来,如果Key相同,那么也是可以分到一个Partition中的。
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(props);
// 指定要发送消息的主题
String topic = "paidaxing_topic";
// 要发送的消息内容
String message = "Hello Paidaxing!";
// 要发送消息的key
String key = "Paidaxing_KEY";
// 创建ProducerRecord,指定主题、键和消息内容
ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, key, message);
// 发送消息
producer.send(record);
// 关闭Kafka生产者
producer.close();
}
}
除了指定Partition和Key以外,还可以自定义实现自己的Partitioner(分区器)来指定消息发送到指定的Partition(分区)
创建一个自定义类并实现Partitioner接口,重写partition()
方法
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// 这里处理和获取分区器的配置参数
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null || !(key instanceof String)) {
throw new InvalidRecordException("键不能为空而且必须是字符串类型");
}`
// 根据自定义的逻辑,确定消息应该发送到哪个分区
String keyValue = (String) key;
int partition = Math.abs(keyValue.hashCode()) % numPartitions;
// 返回分区编号
return partition;
}
@Override
public void close() {
// 在这里进行一些清理操作
}
}
如上述代码所示,在partition()
方法中,利用了一简单的实现逻辑,根据键的Hash值将消息发送到相应的分区。为了在Kafka生产者中使用自定义的Partitioner(分区器),需要在生产者的配置中指定Partitioner类(分区器类)
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerExample {
public static void main(String[] args) {
// 设置Kafka生产者的配置属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定自定义分区器类
props.put("partitioner.class", "com.paidaxing.CustomPartitioner");
// 创建Kafka生产者
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
// 指定要发送消息的主题
String topic = "Paidaxing_TOPIC";
// 要发送的消息内容
String message = "Hello Paidaxing!";
// 要发送消息的key
String key = "Paidaxing_KEY";
// 创建ProducerRecord,指定主题、键和消息内容
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, message);
// 发送消息
producer.send(record);
// 关闭Kafka生产者
producer.close();
}
}
消息积压问题大多数由于消费者故障导致,或者消费能力不足导致
解决思路如下:
上面有提到过Kafka提供的Producer和Consumer之间的消息传递保证语义有三种。
目前,Kafka默认提供的交付可靠性保障是第二种,即at-least-once。但是其实Kafka如果仅靠自身是没办法保证消息是100%可靠的。
原因可以从以下角度考虑:
Kafka是允许生产者以异步方式发送消息,这意味着Producer在发送消息后不会等待确认。当然我们可以注册一个回调等待消息的成功回调。
但是,如果Producer在发送消息之后,Kafka的集群发生故障或崩溃,而消息尚未被完全写入Kafka的日志中,那么这些消息可能会丢失。虽然后续可能会有重试,但是如果重试也失败了呢?如果这个过程中刚好生产者也崩溃呢?那就可能会导致没人知道这条消息失败了。就会导致消息不再重试了。
消费者来说比较简单,只要保证在消息成功被消费时,再去提交offset,这样就不会导致消息丢失了。
Page Cache
中,然后再由操作系统自己决定什么时候同步到磁盘当中,而在这个过程中,如果还没来得及同步到磁盘,就直接宕机了。那么这个消息也是丢失了。log.flush.interval.message=1
,来实现类似于同步刷盘的功能,但是这样又回到了之前的情况,就是还没来得及持久化就宕机了。这几种情况是从Broker角度来分析,Broker自身是没办法保证消息不丢失的,但是如果配合Producer,在配置request.required.acks = -1
这种ACK策略,可以确保消息持久化成功之后,才会ACK给Producer,那么,如果我们的Producer在一定时间内,没有收到ACK是可以重新发送消息。
但是这种重新发送,就又回到了我们前面介绍Producer的时候的问题。生产者也有可能会挂掉,重新发送也有可能没有发送依据,导致消息最终丢失
归根到底,如果只靠Kafka自己,其实是没有办法保证极端情况下的消息100%不丢失的。
但是我们可以引入一些机制来解决保证这个问题,比如:引入分布式事务,或者引入本地消息表,保证Kafka Broker没有保存消息成功返回时,可以重新投递消息,这样才可以。
后续有机会讲讲分布式事务相关概念
好了,本章节到此告一段落。希望对你有所帮助,祝学习顺利。