运维小路
一个不会开发的运维!一个要学开发的运维!一个学不会开发的运维!欢迎大家骚扰的运维!
356篇原创内容
公众号
中间件,我给它的定义就是为了实现某系业务功能依赖的软件,包括如下部分:
Web服务器
代理服务器
ZooKeeper
Kafka
RabbitMQ(本章节)
上个小节我们介绍了RabbitmqMQ的元数据的持久化是和节点的类型有关(内存节点和硬盘节点),但是消息如果写入到RabbitMQ以后,未及时消费,集群崩溃数据是否会被丢失呢,这个就是我们今天要讲的消息持久化。
一、消息持久化的核心概念与意义
消息持久化是指将 RabbitMQ 中的消息存储到物理介质(如磁盘)中,确保在服务器重启或崩溃等异常情况下,消息不会丢失,从而保证系统的可靠性和数据完整性。在分布式系统中,尤其是对数据一致性要求较高的场景(如金融交易、订单处理),消息持久化是必不可少的机制。
二、消息持久化的实现维度
RabbitMQ 的消息持久化通过三个关键要素实现,需分别配置:
1. 交换机(Exchange)持久化:确保交换机在 RabbitMQ 服务重启后能重新创建,避免因交换机丢失导致消息路由失败。配置方式:在声明交换机时设置 durable 参数为 true。
如果是代码创建则可以参考下面的代码
python
channel.exchange_declare(
exchange='my_durable_exchange',
exchange_type='direct',
durable=True # 交换机持久化
)
2. 队列(Queue)持久化:保证队列结构在服务重启后恢复,存储在该队列中的消息才有可能被持久化。配置方式:声明队列时设置 durable 参数为 true。
注意事项:若队列未声明为持久化,即使消息本身是持久化的,队列在重启后会被删除,消息也会丢失。
如果是代码创建则可以参考下面的代码
python
channel.queue_declare(
queue='my_durable_queue',
durable=True # 队列持久化
)
3. 消息(Message)持久化:将消息内容写入磁盘,是持久化的核心环节。配置方式:发送消息时设置 delivery_mode 参数为 2(1 表示非持久化)。
如果是代码创建则可以参考下面的代码
python
channel.basic_publish(
exchange='my_durable_exchange',
routing_key='my_routing_key',
body='Hello, Durable Message!',
properties=pika.BasicProperties(
delivery_mode=2 # 消息持久化
)
)
条件 | 说明 | 如何设置 |
---|---|---|
1. 消息标记为持久化 | 生产者发送消息时需显式指定 delivery_mode=2 (persistent) | 在 basic.publish 属性中设置:props=BasicProperties(delivery_mode=2) |
2. 队列持久化 | 队列必须声明为 durable=true,否则重启后队列消失,消息随之丢失 | 声明队列时设置:channel.queue_declare(queue='name', durable=True) |
3. 交换机持久化 | 绑定队列的交换机必须声明为 durable=true,否则重启后绑定关系丢失 | 声明交换机时设置:channel.exchange_declare(exchange='name', durable=True) |
✅ 只有同时满足以上三点,消息才能在 RabbitMQ 重启后恢复!
三、消息持久化的工作流程与机制
存储流程:当消息被标记为持久化并发送到 RabbitMQ 后,会先存入内存缓冲区。当缓冲区达到一定阈值或满足刷盘条件(如定期刷新、事务提交)时,消息会被写入磁盘的持久化日志文件(msg_store_persistent)。
队列和交换机的元数据(如结构、绑定关系)会存储在 mmap 文件中,确保重启后重建。
恢复机制:RabbitMQ 启动时,会读取磁盘上的持久化数据,重建交换机、队列和未消费的消息。
已消费但未确认(ack)的消息会重新放入队列,等待消费者处理。