在使用 RabbitMQ
的时候,可以通过消息持久化来解决因为服务器的一次崩溃而导致的消息丢失
但是还有一个问题,当消息的生产者将消息发送初期之后,消息到底有没有正确地到达服务器呢?
RabbitMQ
重启,那么 RabbitMQ
重启期间生产者消息投递失败),持久化操作也解决不了这个问题,因为消息根被没有到达服务器,何谈持久化?RabbitMQ
为我们提供了两种解决方案:
publisher confirm
)机制实现事务机制比较消耗性能,在实际工作中使用也不多,我们主要极少 confirm
机制来实现发送方的确认。
RabbitMQ
为我们提供了两个方式来控制消息的可靠性投递
confirm
确认模式return
返回模式Producer
在发送消息的时候,对发送端设置一个 ConfirmCallback
的监听,无论消息是否到达 Exchange
,这个监听都会被执行
Exchange
成功收到,ACK
(Acknowledge character
,确认字符)为 true
ACK
就为 false
步骤如下:
RabbitMQ
接下来看实现步骤:
配置确认机制
spring:
rabbitmq:
addresses: amqp://guest:guest@127.0.0.1:5672/coding
listener:
simple:
acknowledge-mode: manual # 消息接收确认
publisher-confirm-type: correlated # 消息发送确认
无论消息确认成功还是失败,都会调用 ConfirmCallback
的 confirm
方法。
Broker
,ack
为 true
ack
为 false
,并且 cause
提供失败的原因public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("");
if (ack) {
System.out.printf("消息接收成功,id: %s \n", correlationData.getId());
} else {
System.out.printf("消息接收失败,id: %s \n", correlationData.getId());
}
}
});
return rabbitTemplate;
}
ConfirmCallback()
里面的 confirm(@Nullable CorrelationData, boolean ack, @Nullables String cause)
correlationDate
:发送消息时的附加信息,通常用于在确认回调中识别特定的消息ack
:交换机是否收到消息,收到为 true
,未收到为 false
cause
:当消息确认失败时,这个字符串参数将提供失败的原因,这个原因可以用于调试和错误处理。成功时,cause
为 null
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "confirm", "confirm test...", correlationData);
return "确认成功";
}
RabbitTemplate.ConfirmCallback
和ConfirmListener
区别 在RabbitMQ
中,ConfirmListener
和ConfirmCallback
都是用来处理消息确认的机制,但他们属于不同的客户端库,并且使用的场景和方式有所不同
ConfirmListener
是 RabbitMQ
Java Client
库中的接口。这个库是 RabbitMQ
官方提供的一个直接于 RabbitMQ
服务器交互的客户端库。ConfirmListener
接口提供了两个方法:handleAck
和 handleNack
,用于处理消息确认好否定确认的事件ConfirmCallback
是 Spring AMQP
框架中的一个接口。专门为 Spring
环境设计,用于简化与 RabbitMQ
交互的过程,它只包含一个 confirm
方法,用于处理消息确认的回调
在 SpringBoot
应用中,通常会使用 ConfirmCallback
,因为它与 Spring
框架的其他部分更加整合,可以利用 Spring
的配置和依赖注入功能。而在 RabbitMQ
Java Client
库时,则可能会直接实现 ConfirmListener
接口,更直接的与 RabbitMQ
的 Channel
交互运行程序,调用接口: http://127.0.0.1:8080/producer/confirm
观察控制台,消息确认成功:
接下来把交换机名称改一下,重新运行,会触发另一个结果
// 发送失败
confirmRabbitTemplate.convertAndSend("confirm_exchange1", "confirm", "confirm test...", correlationData);
运行结果:
no exchange 'confirm_exchange1' in vhost 'bite'
“,也就是说,bite
这个虚拟机,没有名字为 confirm_exchange1
的交换机public class Constant {
public static final String ACK_EXCHANGE_NAME = "ack_exchange";
public static final String ACK_QUEUE = "ack_queue";
}
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitTemplateConfig {
@Bean("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.printf("");
if (ack){
System.out.printf("消息接收成功, id:%s \n", correlationData.getId());
} else {
System.out.printf("消息接收失败, id:%s, cause: %s", correlationData.getId(), cause);
}
}
});
return rabbitTemplate;
}
}
import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
//publish-confirm模式
//1. 交换机
@Bean("confirmExchange")
public Exchange confirmExchange() {
return ExchangeBuilder.topicExchange(Constant.CONFIRM_EXCHANGE_NAME).durable(true).build();
}
//2. 队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(Constant.CONFIRM_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("confirmBinding")
public Binding confirmBinding(@Qualifier("confirmExchange") Exchange exchange, @Qualifier("confirmQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
}
}
package com.bite.rabbitmq.controller;
import com.bite.rabbitmq.constant.Constant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/product")
public class ProductController {
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() throws InterruptedException {
CorrelationData correlationData1 = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,
"confirm", "confirm test...", correlationData1);
//发送失败
// confirmRabbitTemplate.convertAndSend("confirm_exchange1", "confirm", "confirm test...", correlationData1);
Thread.sleep(2000);
return "确认成功";
}
}
消息到达 Exchange
之后,会根据路由规则匹配,把消息放入 Queue
中,Exchange
到 Queue
的过程:
步骤如下:
RabbitMQ
接下来看实现步骤
# 配置确认机制
spring:
rabbitmq:
addresses: amqp://guest:guest@127.0.0.1:5672/coding
listener:
simple:
acknowledge-mode: manual # 消息接收确认
publisher-confirm-type: correlated # 消息发送确认
setReturnCallback
设置的回调将被触发/**
* retrun 返回模式
*/
@Primary
@Bean("confirmRabbitTemplate2")
public RabbitTemplate confirmRabbitTemplate2(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.printf("消息被退回:%s", returnedMessage);
}
});
return rabbitTemplate;
}
@Resource(name = "confirmRabbitTemplate2")
private RabbitTemplate confirmRabbitTemplate2;
@RequestMapping("/msgReturn")
public String msgReturn() {
CorrelationData correlationData = new CorrelationData("2");
confirmRabbitTemplate2.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm1", "message return test...", correlationData);
return "消息发送成功";
}
RabbitTemplate
的 setMandatory
方法设置消息的 mandatory
属性为 true
(默认为 false
)RabbitMQ
,如果一条消息无法被任何队列消费,RabbitMQ
应该将消息返回给发送者,此时 ReturnCallback
就会被触发回调函数中,有一个参数:ReturnedMessage
,包含以下属性
public class ReturnedMessage {
// 返回的消息对象,包含了消息体和消息属性
private final Message message;
// 由 Broker 提供的回复码,表示消息无法路由的原因,通常是一个数字代码,每个数字代表不同的含义
private final int replyCode;
// 一个文本字符串,提供了无法路由消息的额外信息或错误描述
private final String replyText;
// 消息被发送到的交换机名称
private final String exchange;
// 消息的路由键,即发送消息时指定的键
private final String routingKey;
}
运行程序,调用接口: http://127.0.0.1:8080/producer/msgReturn
观察控制台,消息被退回:
#高频面试
先放一张 RabbitMQ
消息传递图
从这个图中,可以看出,消息可能丢失的场景以及解决方案:
RabbitMQ
失败
confirm
模式return
模式RabbitMQ
之后,RabbitMQ Server
宕机导致消息丢失RabbitMQ
持久化,就是消息写入之后会持久化到磁盘,如果 RabbitMQ
挂了,回复之后会自动读取之前存储的数据(极端情况下,RabbitMQ
还未持久化就挂了,可能导致少量的数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)RabbitMQ
提供了消费者应答机制来使 RabbitMQ
能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动答应的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制,当消息消费异常时,通过消息重试确保消息的可靠性