可直接复制到 CSDN、掘金、知乎、公众号等支持 Markdown 的平台发布
版本:Kafka 0.11+ 通用 学习目标:
幂等性(Idempotence)是一个数学与计算机学概念,指的是对同一个操作执行多次,其结果与执行一次完全一致。在 Kafka 中,这意味着生产(Producer)向 Broker 发送消息时,即使因网络波动等原因导致消息重复发送,Broker 最终也只会持久化一条相同的消息。
举个例子:当用户在电商平台点击 “支付” 按钮后,前端因响应延迟重复提交了两次请求,若支付系统基于 Kafka 实现了幂等性,最终也只会生成一笔有效订单,避免用户被重复扣款。
非幂等(重复扣款) | 幂等(多次转账也只扣一次) |
---|---|
在分布式环境中,消息重复几乎是不可避免的,主要原因包括:
如果没有幂等性保障,这些重复消息可能导致业务逻辑异常(如重复下单、数据不一致等),因此 Kafka 在设计时专门引入了幂等性机制来解决这一问题。
Kafka 的幂等性机制基于生产者 ID(Producer ID) 和序列号(Sequence Number) 两大核心要素,配合 Broker 端的状态管理实现。
维度 | 说明 |
---|---|
作用范围 | 单 Producer Session + Partition |
去重位置 | Broker 端(服务端去重,解放消费者) |
依赖标识 | ① PID(Producer ID)② Sequence Number(分区级单调递增) |
开关配置 | enable.idempotence=true(一行即可) |
# 仅需这一行
enable.idempotence=true
场景 | 是否幂等 | 原因 |
---|---|---|
单分区 | 是 | seq 顺序递增 |
多分区 | 否 | seq 仅分区级 |
重启 Producer(新回话) | 否 | PID 重新生成 |
因此 跨分区原子性 需要 事务机制。
步骤 | API | 说明 |
---|---|---|
1 | initTransactions() | 向 Txn Coordinator 注册 transactional.id |
2 | beginTransaction() | 开启事务 |
3 | send() | 发送多条消息到多分区 |
4 | commitTransaction() / abortTransaction() | 提交或回滚 |
Properties props = new Properties();
props.put("bootstrap.servers", "broker:9092");
props.put("transactional.id", "pay-order-001"); // 全局唯一
props.put("enable.idempotence", "true"); // 自动开启
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("order", "123", "100$"));
producer.send(new ProducerRecord<>("audit", "123", "100$"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Txn Coordinator 利用 两阶段提交(2PC) + __transaction_state 日志,确保跨分区原子性。
acks 值 | 语义 | 延迟 | 丢数据风险 |
---|---|---|---|
0 | 不等待 ACK | 最低 | 高 |
1 | Leader 写成功即返回 | 中 | Leader 挂 → 丢 |
all/-1 | ISR 全部落盘 | 最高 | 不丢,但可能重复(需幂等) |
当
enable.idempotence=true
时,acks 会被强制设为 all,无需手动设置。
# 幂等 Producer
bootstrap.servers=broker1:9092
enable.idempotence=true
acks=all
retries=Integer.MAX_VALUE
max.in.flight.requests.per.connection=5 # 保证顺序
compression.type=lz4
# 事务 Producer(在幂等基础上再加两行)
transactional.id=pay-order-001
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_ENABLE_KRAFT=yes \
confluentinc/cp-kafka:7.5.0
DUPLICATE_SEQUENCE_NUMBER
,但业务只收到 1 条。
commitTransaction()
前制造异常,下游 Consumer 收不到任何消息。
问题 | 原因 | 解决办法 |
---|---|---|
重启 Producer 后重复 | PID 变化 | 使用事务 API 或业务去重 |
多分区重复 | seq 维度是分区级 | 事务消息 |
消费端重复消费 | Rebalance | 消费端幂等表(MySQL/Redis) |