你好,见字如面。我是《Redis 高手心法》作者码哥,腾讯云架构师同盟深圳区理事会成员、InfoQ 签约作者,是一个手持菜刀砍电线,一路火花带闪电的靓仔。
面试官在面试候选人时,如果发现候选人的简历中写了在项目中使用了 MQ 技术(如 Kafka、RabbitMQ、RocketMQ),基本都会抛出一个问题:在使用 MQ 的时候,怎么确保消息 100% 不丢失?重复消费如何保证幂等?
这两个问题在实际工作中也很常见,既能考察你对 MQ 的掌握程度又能很好的判断是否有对应的实战经验。
本文将深入剖析消息丢失的本质原因,揭示 MQ 核心实现原理,并提供一套完整的 Java 实战解决方案。
如下图所示,阿斗被邀请去休闲养生 SPA 享受,服务包含泡脚、按摩、吃水果、看电视,玩真人 CS。
此间乐不思蜀也……
消息的生命周期如下图所示。
你可以发现,从生产者发送消息,MQ 保存消息,消费者消费消息,每一个环节都有可能丢失消息。
环节 | 故障概率 | 平均恢复时间 |
---|---|---|
网络传输 | 0.1%-1% | 秒级 |
内存存储 | 0.01%-0.1% | 分钟级 |
磁盘故障 | 0.001%-0.01% | 小时级 |
程序异常 | 0.1%-5% | 分钟级 |
当生产者往 MQ 中写数据时,以下场景会导致消息丢失:
生产者发送消息,主流消息队列都支持同步发送和异步发送。
如果使用同步发送,生产者发送消息后,会同步等待 Broker 返回的 ACK,收到 ACK 消息,就认为消息发送成功。如果长时间没有收到,则会认为消息发送失败,需要进行重试。
消息发送的流程如下图所示,基于本地消息表 + 业务数据表构成本地事务。
通过消息一步发送并接受消息队列的 ACK 来更新消息表状态,若果未发送则继续重试发送,保证消息一定发送出去。
代码案例如下所示:
@Service
publicclass ReliableProducer {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void createOrder(Order order) {
// 1. 业务数据入库
jdbcTemplate.update(
"INSERT INTO orders(id, amount) VALUES(?, ?)",
order.getId(), order.getAmount());
// 2. 消息记录入库
String msgId = UUID.randomUUID().toString();
jdbcTemplate.update(
"INSERT INTO message_log(msg_id, topic, message, status) VALUES(?, ?, ?, ?)",
msgId, "orders", JsonUtil.toJson(order), 0); // 0-待发送
// 事务提交后触发异步发送
CompletableFuture.runAsync(() -> sendWithRetry(msgId));
}
// 这里其实可以使用 xxl-job 等分布式调度框架查询未发送成功的消息发送。
private void sendWithRetry(String msgId) {
MessageRecord msg = jdbcTemplate.queryForObject(
"SELECT * FROM message_log WHERE msg_id = ?",
new MessageRecordRowMapper(), msgId);
int attempt = 0;
while (attempt < MAX_RETRIES) {
try {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(msg.getTopic(), msg.getMessage());
future.addCallback(result -> {
// 更新发送状态
jdbcTemplate.update("UPDATE message_log SET status = 1 WHERE msg_id = ?", msgId);
}, ex -> {
scheduleRetry(msgId, attempt); // 失败重试
});
return;
} catch (Exception e) {
scheduleRetry(msgId, attempt);
attempt++;
}
}
}
private void scheduleRetry(String msgId, int attempt) {
long delay = (long) Math.pow(2, attempt) * 1000; // 指数退避
scheduler.schedule(() -> sendWithRetry(msgId), delay, TimeUnit.MILLISECONDS);
}
}
MQ 类型 | ACK 机制 | 可靠性 | 性能影响 |
---|---|---|---|
Kafka | acks=0 | 最低 | 无 |
Kafka | acks=1 | 中等 | 低 |
Kafka | acks=all | 最高 | 高 |
RabbitMQ | 无确认 | 低 | 无 |
RabbitMQ | 生产者确认 | 高 | 中等 |
RocketMQ | 同步刷盘 | 最高 | 高 |
生产者发送消息成功,也不能保证消息绝对不丢失。因为即使消息发送到 Broker,如果在消费者拉取到消息之前,Broker 宕机了,消息还没有落盘,也会导致消息丢失。
kafka 为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。
消息的刷盘过程,为了提高性能,减少刷盘次数,kafka 采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。
这种机制也是由于 linux 操作系统决定的。
将数据存储到 linux 操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从 page cache 到 file),或者通过 fsync 命令强制刷盘。
数据在 page cache 中时,如果系统挂掉,数据会丢失。
如图所示的 kafka 集群,一个 Broker 的 Topic 其中一个 partition 一共有三 副本(包含 Leader)。
试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。
该场景的 Kafka Broker 黄金高可靠配置如下:
# Kafka配置示例
acks=all
min.insync.replicas=2 // 最小同步副本数
replication.factor=3 // 每个分区的 总副本数量(含 Leader)
unclean.leader.election.enable=false
log.flush.interval.messages=10000
log.flush.interval.ms=1000
replication.factor=3
且 min.insync.replicas=2
时:允许 1 个副本宕机(如 Broker 故障)、若 2 个副本不可用,则生产会被阻塞false
。消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移(offset)。
偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。
自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。
这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
开启手动提交的时候消费端需要去保证幂等性。
@Slf4j
@Component
publicclass ReliableConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private OrderService orderService;
@KafkaListener(topics = "orders")
public void consume(ConsumerRecord<String, String> record) {
String msgId = record.key();
Order order = JsonUtil.fromJson(record.value(), Order.class);
// 1. 幂等检查
if (isProcessed(msgId)) {
log.info("消息重复消费,已跳过: {}", msgId);
return;
}
// 2. 获取分布式锁
Lock lock = redisLockFactory.getLock("LOCK:" + msgId);
if (!lock.tryLock(3, TimeUnit.SECONDS)) {
thrownew ConcurrentAccessException("获取锁失败");
}
try {
// 3. 二次幂等检查(防并发)
if (isProcessed(msgId)) {
return;
}
// 4. 业务处理
orderService.processOrder(order);
// 5. 记录处理状态(设置24小时过期)
markProcessed(msgId);
} catch (BusinessException e) {
// 6. 业务异常处理
handleFailure(record, e);
} finally {
lock.unlock();
}
}
private boolean isProcessed(String msgId) {
return"PROCESSED".equals(
redisTemplate.opsForValue().get("MSG:" + msgId));
}
private void markProcessed(String msgId) {
redisTemplate.opsForValue().set(
"MSG:" + msgId, "PROCESSED", 24, TimeUnit.HOURS);
}
private void handleFailure(ConsumerRecord<?, ?> record, Exception e) {
// 失败计数
int failCount = incrementFailCounter(record.key());
if (failCount < 3) {
thrownew RetryableException(e); // 触发重试
} else {
sendToDlq(record); // 转移死信队列
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 配置批量ACK(性能与可靠性的平衡)
factory.getContainerProperties().setAckMode(
AckMode.BATCH);
// 消费并发控制
factory.setConcurrency(3);
return factory;
}
}
除了对生产者、MQ 中间件、消费端保证不丢失消息的处理手段,还可以对消息轨迹进行监控。
自动化对账系统实现代码案例。
@Service
@Slf4j
publicclass ReconciliationService {
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void dailyReconciliation() {
// 1. 生产端计数
long produced = countProducerMessages();
// 2. MQ端计数
long stored = countMQMessages();
// 3. 消费端计数
long consumed = countConsumerMessages();
// 4. 数据对比
if (produced != stored) {
handleLoss(produced - stored, "生产到MQ丢失");
}
if (stored != consumed) {
handleLoss(stored - consumed, "MQ到消费丢失");
}
log.info("对账完成: 生产={}, MQ存储={}, 消费={}",
produced, stored, consumed);
}
private void handleLoss(long lossCount, String stage) {
log.error("消息丢失告警: 阶段={}, 数量={}", stage, lossCount);
// 1. 通知运维团队
alertService.notifyStaff(stage, lossCount);
// 2. 自动恢复机制
if (lossCount < 1000) {
recoveryService.recoverFromBackup();
} else {
// 重大事故,启动紧急预案
emergencyService.handleDisaster();
}
}
}
消息零丢失的三位一体架构本质上是对不确定性的系统化防御:
在 Java 生态中,我们拥有强大的工具集实现这套防御: