我们在前面说了消息端处理消息时,消息如何不丢失,但是如何保证当 RabbitMQ
服务器停掉之后,生产者发送的消息不丢失呢?
默认情况下,RabbitMQ
退出或者由于某种原因崩溃时,会忽视队列和消息,除非告知他不要这么做
RabbitMQ
的持久化分为三个部分:
交换机的持久化,是通过在声明交换机时将 durable
参数置为 true
实现的
MQ
的服务器发生意外或关闭之后,重启 RabbitMQ
时不需要重新去建立交换机,交换机会自动建立,相当于一直存在RabbitMQ
服务器重启之后,相关的交换机元数据会丢失,对一个长期使用的交换机来说,建议将其置为持久化的ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
队列的持久化是通过在声明队列时将 durable
参数置为 true
实现的
RabbitMQ
服务器重启之后,该队列就会被删除掉,此时数据也会丢失(队列没有了,消息也无处可存了)咱们前面用的创建队列的方式都是持久化的
QueueBuilder.durable(COnstant.ACK_QUEUE).build();
点进去看源码会发现,该方法默认 durable
是 true
public static QueueBuilder durable() {
return durable(namingStrategy.generateName());
}
private QueueBuilder setDurable() {
this.durable = true;
return this;
}
通过下面代码,可以创建非持久化的队列
QueueBuilder.noDurable(Constant.ACK_QUEUE).build();
消息实现持久化,需要把消息的投递模式 (MessageProperties
中的 deliveryMode
)设置为 2,也就是 MessageDeliveryMode.PERSISTENT
public enum MessageDeliveryMode {
NON_PERSISTENT,
PERSISTENT;
设置了队列和消息的持久化,当 RabbitMQ
服务器重启之后,消息依旧存在。
// 非持久化信息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 持久化信息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAN, msg.getBytes());
MessageProperties.PERSISTENT_TEXT_PLAIN
实际就是封装了这个属性
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
}
如果使用 RabbitTemplate
发送持久化消息,代码如下:
// 要发送的消息内容
String message = "This is a persistent message";
// 创建一个 Message 对象,设置为持久化
Message messagePbject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使用 RabbitTemplate 发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
RabbitMQ
默认情况下会将消息视为持久化,除非队列被声明为非持久化,或者消息在发送时被标记为非持久化Message
这个对象,来观察消息是否持久化(Body:'consumer ack test...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=ack_exchange, receivedRoutingKey=ack, deliveryTag=2, consumerTag=amq.ctag-mtd-2Mec9zH2fXizRqVAqg, consumerQueue=ack_queue])
RabbitMQ
的性能(随机)将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?答案是否定的
autoAck
参数设置为 true
,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将 autoAck
参数设置为 false
,并进行手动确认,详细可以参考(消息确认章节)
RabbitMQ
之后,还需要有一段时间(虽然很短,但是不可忽视)才能存入磁盘中。RabbitMQ
并不会为每条信息都进行同步存盘(调用内核的 fsync
方法)的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内 RabbitMQ
服务器节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失
这个问题如何解决呢?
RabbitMQ
的仲裁队列(后面会说),如果主节点(master
)在此特殊时间内挂掉,可以自动切换到从节点(slave
),这样有效地保证了高可用性,除非整个集群都挂掉
100%
可靠,但是配置了仲裁队列要比没有配置的可靠性要高很多,实际生产环境中的关键业务队列一般都会设置仲裁队列RabbitMQ
中(详情参考后面发送方确认)
在消息传递过程中,可能会遇到各种问题,如网络故障,服务不可用,资源不足等,这些问题可能导致消息处理失败
为了解决这些问题,RabbitMQ
提供了重试机制,允许消息在处理失败后重新发送
但如果是程序逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数
spring:
rabbitmq:
addresses: amqp://guest:guest@127.0.0.1:5672/coding
listener:
simple:
acknowledge-mode: manual # 消息接收确认
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待时长为 5s max-attempts: 5 # 最大重试次数(包括自身消费的一次)
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";
public static final String RETRY_QUEUE = "retry_queue";
// 1. 交换机
@Bean("retryExchange")
public FanoutExchange retryExchange() {
return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
}
// 2. 队列
@Bean("retryQueue")
public Queue retryQueue() {
return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
// 3. 队列和交换机绑定 Binding@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryExchange") FanoutExchange exchange, @Qualifier("retryQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
@RequestMapping("/retry")
public String retry() {
rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry test...");
return "发送成功";
}
@Component
public class RetryQueueListener {
// 指定监听队列的名称
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void Listener(Message message) throws Exception {
System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
// 模拟处理失败
int num = 3 / 0;
System.out.println("处理完成");
}
运行程序,调用接口,发送消息: http://127.0.0.1:8080/producer/retry
[外链图片转存中…(img-7OSM9aK6-1752140853006)]
如果对异常进行捕获,那么就不会进行重试
System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
// 模拟处理失败
try {
int num = 3 / 0;
System.out.println("处理完成");
} catch (Exception e) {
System.out.println("处理失败");
}
重新运行程序,结果如下:[外链图片转存中…(img-7hevkq3m-1752140853007)]
改为手动确认
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
// 模拟处理失败
int num = 3 / 0;
System.out.println("处理完成");
// 3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 4. 异常了就拒绝签收
Thread.sleep(1000);
// 第三个参数 requeue,是否重新发送。若为 true,则重发;若为 false,则直接丢弃
channel.basicNack(deliveryTag, true, true);
}
}
运行结果:[外链图片转存中…(img-ne4pJWMZ-1752140853008)]
自动确认模式下,RabbitMQ
会在消息被投递给消费者后自动确认消息。
RabbitMQ
根据配置的重试参数自动将消息重新入队,从而实现重试RabbitMQ
的配置中设定,并且 RabbitMQ
会负责执行这些重试策略手动确认模式下,消费者需要显式地对消息进行确认。如果消费者在处理消息时遇到异常,可以选择不确认消息,使消息重新入队
RabbitMQ
的内部机制RabbitMQ
的高级特性来实现有效的重试策略使用重试机制时需要注意:
unacked
的状态,导致消息积压