
生产者发送消息之后,到达消费端之后,可能会有以下情况:

RabbitMQ 向消费者发送消息之后,就会把这条消息删掉,那么第两种情况,就会造成消息丢失。那么如何确保消费端已经成功接收了,并正确处理了呢?
为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制!
消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种:
autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。适合对于消息可靠性要求不高的场景。autoAck 等于 false 时,RabbitMQ 会等待消费者显式地调用 Basic.Ack 命令,回复确认信号后才从内存(或者磁盘)中移去消息。适合对消息可靠性要求比较高的场景。String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;代码示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);当 autoAck 参数置为 false,队列中的消息分成了两个部分:
Ready:等待投递给消费者的消息数Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

从管理平台上可以看到当前队列中 Ready 状态和 Unacked 状态的消息数:

消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ 也提供了不同的确认应答的方式,消费者客户端可以调用与其对应的 channel 的相关方法,共有以下三种:
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。
Channel.basicAck(long deliveryTag, boolean multiple);deliveryTag:消息的唯一标识,它是一个单调递增的 64 位的长整型值。deliveryTag 是每个通道(Channel)独立维护的,所以在每个通道上都是唯一的。当消费者确认(ack)一条消息时,必须使用对应的通道上进行确认。multiple:是否批量确认。在某些情况下,为了减少网络流量,可以对一系列连续的 deliveryTag 进行批量确认。true:表示一次性 ack 所有小于等于指定 deliveryTag 的消息false:表示确认当前指定 deliveryTag 的消息
deliveryTag是 RabbitMQ 中消息确认机制的一个重要组成部分,它确保了消息传递的可靠性和顺序性。
RabbitMQ 在 2.0.0 版本开始引入了 Basic.Reject 这个命令,消费者客户端可以调用 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。
Channel.basicReject(long deliveryTag, boolean requeue);deliveryTag:参考 channel.basicAckrequeue:表示拒绝后,这条消息如何处理。true:会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。false:会把消息从队列中移除,而不会把它发送给新的消费者。Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令,消费者客户端可以调用 channel.basicNack 方法来实现。
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);我们基于 SpringBoot 来演示消息的确认机制,使用方式和使用 RabbitMQ Java Client 库有一定差异。(主要体现在后者使用的是 channel.basicConsume 来接收消息以及做出回调处理,而前者用的是注解 @RabbitListener 来做回调处理)
Spring-AMQP 对消息确认机制提供了三种策略:
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}NONEAUTO(默认)basicAck()。MANUALchannel.basicAck() 或 basicNack()。下面以 AcknowledgeMode.NONE 为例,其它两种就是就是改一下配置文件中的 acknowledge-mode 即可!
spring:
rabbitmq:
addresses: amqp://liren:123123@127.0.0.1/lirendada
listener:
simple:
acknowledge-mode: none # 设置确认机制为立刻确认2. 编写常量类:
public class Constant {
public static final String ACK_EXCHANGE_NAME = "ack_exchange";
public static final String ACK_QUEUE = "ack_queue";
}3. 配置与绑定队列和交换机:
@Configuration
public class RabbitMQConfig {
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constants.ACK_QUEUE).build();
}
@Bean("ackExchange")
public DirectExchange ackExchange() {
return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE_NAME).durable(true).build();
}
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackQueue") Queue queue,
@Qualifier("ackExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("ack");
}
}4. 发送消息:
@RequestMapping("/producer")
@RestController
public class producerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");
return "发送成功!";
}
}5. 消费消息:
@Component
public class AckListener {
@RabbitListener(queues = Constants.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws UnsupportedEncodingException {
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
// 模拟处理失败,会抛异常
int a = 3 / 0;
System.out.println("处理完成");
}
}消费者处理消息时,消息如何不丢失呢?如何保证当 RabbitMQ 服务停掉以后,生产者发送的消息不丢失呢?
默认情况下,RabbitMQ 退出或者由于某种原因崩溃时,会忽视队列和消息,除非告知它不要这么做。
RabbitMQ 的持久化分为三个部分:
交换器的持久化是在声明交换机时设置 durable 参数为 true,相当于将交换机的属性在服务器内部保存。
设置持久化之后,当 RabbitMQ 的服务器发生意外或关闭之后,进行重启时不需要重新去建立交换机,交换机会自动建立,相当于一直存在。
如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,对一个长期使用的交换器来说,建议将其置为持久化的。
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()队列的持久化是在声明队列时设置 durable 参数为 true。
如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,该队列就会被删掉,此时数据也会丢失。(因为队列不存在了,那么消息也无处可存了)
但是设置了队列的持久化,也只能保证该队列本身的元数据不会因异常情况而丢失,并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要设置消息持久化。
💥注意事项:创建队列的时候默认 durable 为 true,即 RabbitMQ 默认开启队列持久化。
消息实现持久化,需要在发送消息的时候,将消息的投递模式(MessageProperties 中的 deliveryMode)设置为 2,也就是 MessageDeliveryMode.PERSISTENT。
public enum MessageDeliveryMode {
NON_PERSISTENT, // 非持久化
PERSISTENT; // 持久化
}设置了队列以及消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在。
如果只设置队列持久化,重启之后消息会丢失;如果只设置消息持久化,重启之后队列消失,继而消息也丢失。所以单单设置消息持久化而不设置队列持久化显得毫无意义。
// 非持久化信息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 持久化信息
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());其中 MessageProperties.PERSISTENT_TEXT_PLAIN 实际就是封装了这个属性,其源码如下所示:
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2, // deliveryMode
0, null, null, null,
null, null, null, null,
null, null);如果是在 springboot 中使用 RabbitTemplate 发送持久化消息,操作如下所示:
// 要发送的消息内容
String message = "This is a persistent message";
// 创建一个Message对象,设置为持久化
Message messageObject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使用RabbitTemplate发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);💥注意事项:RabbitMQ 默认情况下会将消息视为持久化的,除非队列被声明为非持久化,或者消息在发送时被标记为非持久化。
将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能(随机)。写入磁盘的速度比写入内存的速度慢得不止一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗❓❓❓答案是否定的。
autoAck 参数设置为 true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将 autoAck 参数设置为 false,进行手动确认。fsync 方法),而是先通过 write() 写入操作系统的页缓存中,等待系统或批量策略触发再真正写入磁盘。因此,如果在这段缓存尚未同步到磁盘的时间窗口内 RabbitMQ 节点发生宕机或重启,尚未落盘的消息仍可能丢失。
这个问题怎么解决呢❓❓❓
仲裁队列,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效地保证了高可用性,除非整个集群都挂掉。(此方法同样不能保证 100% 可靠,但是配置了仲裁队列要比没有配置仲裁队列的可靠性要高很多,实际生产环境中的关键业务队列一般都会设置仲裁队列)事务机制 或者 发布确认机制 来保证消息已经正确地发送并存储至 RabbitMQ 中。在使用 RabbitMQ 的时候,可以通过消息持久化来解决因为服务器的异常崩溃而导致的消息丢失,但是还有一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果在消息到达服务器之前已经丢失(比如 RabbitMQ 重启,那么 RabbitMQ 重启期间生产者消息投递失败),持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ 为我们提供了两种解决方案:
RabbitMQ 为我们提供了两个方式来控制消息的可靠性投递
confirm 确认模式return 退回模式
生产者在发送消息的时候,对发送端设置一个 ConfirmCallback 的监听,无论消息是否到达交换机,这个监听都会被执行。如果 Exchange 成功收到,ACK 为 true;如果没收到消息,ACK 就为 false。
RabbitTemplate.ConfirmCallback和ConfirmListener区别💥💥💥 在 RabbitMQ 中,ConfirmListener和ConfirmCallback都是用来处理消息确认的机制,但它们属于不同的客户端库,并且使用的场景和方式有所不同。
ConfirmListener 是 RabbitMQ Java Client 库中的接口。这个库是 RabbitMQ 官方提供的一个直接与 RabbitMQ 服务器交互的客户端库。ConfirmListener 接口提供了两个方法:handleAck 和 handleNack,用于处理消息确认和否定确认的事件。ConfirmCallback 是 Spring AMQP 框架中的一个接口,专门为 Spring 环境设计,用于简化与 RabbitMQ 交互的过程。它只包含一个 confirm 方法,用于处理消息确认的回调。在 SpringBoot 应用中,通常会使用 ConfirmCallback,因为它与 Spring 框架的其他部分更加整合,可以利用 Spring 的配置和依赖注入功能。而在使用 RabbitMQ Java Client 库时,则可能会直接实现 ConfirmListener 接口,更直接的与 RabbitMQ 的 Channel 交互。
public interface ConfirmCallback {
/**
* 确认回调
* @param correlationData: 发送消息时的附加信息, 通常用于在确认回调中识别特定的消息
* @param ack: 交换机是否收到消息, 收到为true, 未收到为false
* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以用于调试和错误处理
* 成功时, cause为null
*/
void confirm(@Nullable CorrelationData correlationData,
boolean ack,
@Nullable String cause);
}spring:
rabbitmq:
addresses: amqp://liren:123123@127.0.0.1/lirendada
listener:
simple:
acknowledge-mode: manual # 消息确认
publisher-confirm-type: correlated # 发布确认机制其中发布确认机制这里有三种可选方式:(对应前面学习发布确认机制时候的 无确认、单独确认/批量确认、异步确认)
模式 | 是否启用确认 | 方式 | 阻塞性 | 性能 | 典型场景 |
|---|---|---|---|---|---|
NONE | 否 | 无确认 | 非阻塞 | ⭐⭐⭐⭐ | 普通日志、监控消息 |
SIMPLE | 是 | 同步等待 | 阻塞 | ⭐ | 关键事务型消息 |
CORRELATED | 是 | 异步回调 | 非阻塞 | ⭐⭐⭐ | 高并发可靠投递 |
2. 常量类
// 发布确认机制
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE = "confirm_queue";3. 设置确认回调逻辑并发送消息
ConfirmCallback 的 confirm 方法。ack=trueack=false,并且由参数 cause 提供失败的原因public class RabbitTemplateConfig {
@Bean("ackRabbitTemplate")
public RabbitTemplate AckRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean("confirmRabbitTemplate")
public RabbitTemplate ConfirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack) {
System.out.printf("消息接收成功, id:%s", correlationData.getId());
} else {
System.out.printf("消息接收失败, id:%s cause:%s", correlationData.getId(), cause);
}
}
});
return rabbitTemplate;
}
}4. 发送消息
@RequestMapping("/producer")
@RestController
public class producerController {
@Resource(name = "ackRabbitTemplate")
private RabbitTemplate ackRabbitTemplate;
@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/ack")
public String ack() {
ackRabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");
return "发送成功!";
}
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE_NAME, "confirm", "consumer confirm test...", correlationData);
return "发送成功!";
}
}上面代码中有几处细节:
@Resource 注入即可。ConfirmCallback 导致报错的情况,通常将设置 ConfirmCallback 的操作放在配置类中完成。消息到达 Exchange 之后,会根据路由规则匹配,把消息放入 Queue 中。Exchange 到 Queue 的过程,如果一条消息无法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等),可以选择把消息退回给发送者。消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理,这就是所谓的退回模式。
spring:
rabbitmq:
addresses: amqp://liren:123123@127.0.0.1/lirendada
listener:
simple:
acknowledge-mode: manual # 消息确认
publisher-confirm-type: correlated # 发布确认机制2. 设置返回回调逻辑:当消息无法被路由到任何队列,它将返回给发送者,这时 setReturnCallback 设置的回调将被触发
@Bean("confirmRabbitTemplate")
public RabbitTemplate ConfirmRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置confirm回调(发送者 -> 交换机)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack) {
System.out.printf("消息接收成功, id:%s\n", correlationData.getId());
} else {
System.out.printf("消息接收失败, id:%s cause:%\n", correlationData.getId(), cause);
}
}
});
// 设置return回调(交换机 -> 队列)
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.printf("消息被退回: %s\n", returned);
}
});
return rabbitTemplate;
}3. 发送消息:
@RequestMapping("/returns")
public String returns() {
CorrelationData correlationData = new CorrelationData("5");
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE_NAME, "confirm", "consumer returns test...", correlationData);
confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE_NAME, "confirm11", "consumer returns test...", correlationData);
return "发送成功!";
}
使用 RabbitTemplate.setMandatory() 方法设置消息的 mandatory 属性为 true(默认为 false)。这个属性的作用是告诉 RabbitMQ,如果一条消息无法被任何队列消费,RabbitMQ 应该将消息返回给发送者,此时 ReturnCallback 会被触发。
其中该回调函数中有一个参数:ReturnedMessage,包含以下属性:
public class ReturnedMessage {
// 返回的消息对象,包含了消息体和消息属性
private final Message message;
// 由Broker提供的回复码, 表示消息无法路由的原因. 通常是一个数字代码,每个数字代表不同的含义.
private final int replyCode;
// 一个文本字符串, 提供了无法路由消息的额外信息或错误描述.
private final String replyText;
// 消息被发送到的交换机名称
private final String exchange;
// 消息的路由键,即发送消息时指定的键
private final String routingKey;
}
从这个图中可以看出,消息可能丢失的场景以及解决方案:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。