首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【RabbitMQ】高级特性—消息确认详解

【RabbitMQ】高级特性—消息确认详解

作者头像
椰椰椰耶
发布2025-08-16 09:48:11
发布2025-08-16 09:48:11
19900
代码可运行
举报
文章被收录于专栏:学习学习
运行总次数:0
代码可运行

消息确认机制

生产者发送消息之后,到达消费端之后,可能会有以下情况:

  1. 消息处理成功
  2. 消息处理异常
image.png
image.png

RabbitMQ 向消费者发送消息之后,就会把这条消息删除掉

  • 但是如果消息发出去之后,处理异常,而生产者不知道,就把消息删除了,这种情况就会出问题

如何确保消费端已经成功接收,并正确处理了呢?

  • 为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制(messageacknowledgement

消费者在订阅队列的时候,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种:

  • 自动确认:当 autoAck=trueRabbitMQ 会自动把发出去的消息设置为确认,然后从内存(或磁盘) 中删除,而不管消费者是否真正地消费到了这些消息
    • 自动确认模式适合对于消息可靠性要求不高的场景
    • 钉钉的已读未读,只要点开了消息,不管读没读消息,一律为“已读”
  • 手动确认:当 autoAck=falseRabbitMQ 会等待消费者显式地调用 Basic.Ack 命令,恢复确认信号后才从内存(硬盘) 中移去消息
    • 这种模式适合对消息可靠性要求比较高的场景
    • 发送消息,只有对方回复了之后,才视为“已读”
手动确认原理

autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分

  1. 等待投递给消费者的消息
  2. 已经投递给消费者,但是还没有收到消费者确认信号的消息 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者(也有可能还是原来那个消费者)
image.png
image.png

RabbitMQWeb 管理平台上,也可以但看当前队列中 Ready 状态和 Unacked 状态的消息数

image.png
image.png
  • Ready:等待投递给消费者的消息数
  • Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数

手动确认方法

消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ 也提供了不同的确认应答的的方式,消费者客户端可以调用与其对应的 channel 的先关方法,共有三种

  1. 肯定确认
  2. 否定确认
  3. 否定确认
肯定确认——basicAck

肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)

  • RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • delivertTag
    • 消息的唯一标识,它是一个单调递增的 64 位的长整型值。
    • 是每个通道(Channel) 独立维护的,所以在每个通道上都是唯一的
    • 当消费者确认(ack) 一条消息时,必须使用对应的通道上进行确认
  • multiple
    • 是否批量确认
    • 在某些情况下,为了减少网络流量,可以对一系列连续的 deliveryTag 进行批量确认
    • 值为 true,则会一次性 ack 所有小于或等于指定 deliveryTag 的消息
    • 值为 false,则只确认当前指定 deliveryTag 的消息
    image.png
    image.png

deliveryTagRabbitMQ 中消息确认机制的一个重要组成部分,它确保了消息传递的可靠性和顺序性

否定确认——basicReject

否定确认:Channel.basicReject(long deliveryTag, boolean requeue)

  • RabbitMQ2.0.0 版本开始引入了 Basic.Reject 这个命令,消费者客户端可以调用 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息
  • deliveryTag:参考 channel.basicAck
  • requeue
    • 表示拒绝后,这条消息如何处理(是否重新入队
    • 如果 requeue 参数设为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者
    • 如果 requeue 参数设为 false,则 RabbitMQ 会把消息从队列中移除,而不会把它发送给新的消费者
否定确认——basicNack

否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • Basic.Reject 一次只能拒绝一条消息,如果想要批量拒绝,则可以使用 Basic.Nack 这个命令
  • 消费者客户端可以调用 channel.basicNack 方法来实现
  • 参数介绍可以参考上面两个方法
  • multiple 参数设置为 true ,则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息

代码示例

我们基于 Spring Boot 来演示消息的确认机制,使用方式和使用 RabbitMQ Java Client 有一定差异

Spring-AMQP 对消息确认机制提供了三种策略

代码语言:javascript
代码运行次数:0
运行
复制
public enum AcknowledgeMode {
	NONE,
	MANUAL,
	AUTO;
}
  1. AcknowledgeMode.NONE
    • 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 都会自动确认消息,从 RabbitMQ 队列中移除消息
    • 如果消费者处理消息失败,消息可能会丢失
  2. AcknowledgeMode.AUTO默认
    • 消费者在消息处理成功时会自动确认消息
    • 如果处理过程中抛出了异常,则不会确认消息
  3. AcknowledgeMode.MANUAL
    • 手动确认模式下,消费者必须在成功处理消息后显示调用 basicAck 方法来确认消息
    • 如果消息未被确认,RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息
    • 这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不回丢失,而是可以被重新处理

主要流程:

  1. 配置确认机制(自动确认/手动确认)
  2. 生产者发送消息
  3. 消费端逻辑
  4. 测试
AcknowledgeMode. NONE
  • 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 都会自动确认消息,从 RabbitMQ 队列中移除消息
  • 如果消费者处理消息失败,消息可能会丢失
1. 配置确认机制
代码语言:javascript
代码运行次数:0
运行
复制
spring:  
  rabbitmq:  
    addresses: amqp://guest:guest@127.0.0.1:5672/coding  
    listener:  
      simple:  
        acknowledge-mode: none
2. 发送消息

队列,交换机配置

代码语言:javascript
代码运行次数:0
运行
复制
public class Constant {  
    public static final String ACK_EXCHANGE_NAME = "ack_exchange";  
    public static final String ACK_QUEUE = "ack_queue";  
}
代码语言:javascript
代码运行次数:0
运行
复制
import org.springframework.amqp.core.*;  
import org.springframework.beans.factory.annotation.Qualifier;  
import org.springframework.context.annotation.Bean;  
  
public class RabbitMQConfig {  
    // 1. 交换机  
    @Bean("ackExchange")  
    public Exchange ackExchange() {  
        return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();  
    }  
  
    // 2. 队列  
    @Bean("ackQueue")  
    public Queue ackQueue() {  
        return QueueBuilder.durable(Constant.ACK_QUEUE).build();  
    }  
  
    // 3. 队列和交换机绑定 Binding    
    @Bean("ackBinding")
    public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, @Qualifier("ackQueue") Queue queue) {  
        return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();  
    }  
}

通过接口发送消息:

代码语言:javascript
代码运行次数:0
运行
复制
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
@RequestMapping("/producer")  
public class ProductController {  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
  
    @RequestMapping("/ack")  
    public String ack() {  
        rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");  
        return "发送成功!";  
    }  
}
3. 写消费端逻辑
代码语言:javascript
代码运行次数:0
运行
复制
import com.rabbitmq.client.Channel;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
@Component  
public class AckQueueListener {  
    // 指定监听队列的名称  
    @RabbitListener(queues = Constant.ACK_QUEUE)  
    public void ListenerQueue(Message message, Channel channel) throws Exception {  
        System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  
                message.getMessageProperties().getDeliveryTag());  
        // 模拟处理失败  
        int num = 3 / 0;  
        System.out.println("处理完成");  
    }  
}

这个代码运行结果是正常的,运行后消息会被签收:Ready 为 0,unacked 为 0

4. 运行程序

调用接口,发送消息: http://127.0.0.1:8080/producer/ack

开启消费者,控制台输出:

image.png
image.png

管理界面

image.png
image.png
  • 可以看到,消费者处理失败,但是消息已经从 RabbitMQ 中移除
AcknowledgeMode. AUTO
  • 消费者在消息处理成功时会自动确认消息
  • 如果处理过程中抛出了异常,则不会确认消息

1. 配置确认机制
代码语言:javascript
代码运行次数:0
运行
复制
配置确认机制
spring:  
  rabbitmq:  
    addresses: amqp://guest:guest@127.0.0.1:5672/coding  
    listener:  
      simple:  
        acknowledge-mode: auto
2. 重新运行程序

调用接口,发送消息: http://127.0.0.1:8080/producer/ack

可以看到队列中有一条消息,unacked 的为 0(需要先把消费者注掉,注掉相关注解即可)

image.png
image.png

开启消费者,控制台不断输出错误信息

image.png
image.png

管理界面

image.png
image.png
  • 从日志上可以看出,当消费者出现异常时,RabbitMQ 会不断重发
  • 由于异常,多次重试还是失败,消息没被确认,也无法 nack,就一直是 unacked 状态,导致消息积压
AcknowledgeMode. MANUAL
1. 配置确认机制
代码语言:javascript
代码运行次数:0
运行
复制
配置确认机制
spring:  
  rabbitmq:  
    addresses: amqp://guest:guest@127.0.0.1:5672/coding  
    listener:  
      simple:  
        acknowledge-mode: manual
2. 消费端手动确认逻辑
代码语言:javascript
代码运行次数:0
运行
复制
import com.example.rabbit_features.constant.Constant;  
import com.rabbitmq.client.Channel;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
@Component  
public class AckQueueListener {  
  
    // 指定监听队列的名称  
    @RabbitListener(queues = Constant.ACK_QUEUE)  
    public void ListenerQueue(Message message, Channel channel) throws Exception {  
        long deliveryTag = message.getMessageProperties().getDeliveryTag();  
        try {  
            // 1. 接收消息  
            System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  
                    message.getMessageProperties().getDeliveryTag());  
            // 2. 处理业务逻辑  
            System.out.println("处理业务逻辑");  
            // 手动设置一个异常,来测试异常拒绝机制  
            // int num = 3 / 0;  
            // 3. 手动签收  
            channel.basicAck(deliveryTag, true);  
        } catch (Exception e) {  
            // 4. 异常了就拒绝签收  
            // 第三个参数 requeue,是否重新发送,如果为 true,则会重发;若为 false,则直接丢弃  
            channel.basicNack(deliveryTag, true, true);  
        }  
    }  
}
  • 这个代码运行的结果是正常的,运行后消息会被签收:Read 为 0,unacked 为 0

控制台输出

image.png
image.png

管理界面

image.png
image.png
3. 异常时拒绝签收

主动设置异常

代码语言:javascript
代码运行次数:0
运行
复制
@Component  
public class AckQueueListener {  
  
    // 指定监听队列的名称  
    @RabbitListener(queues = Constant.ACK_QUEUE)  
    public void ListenerQueue(Message message, Channel channel) throws Exception {  
        long deliveryTag = message.getMessageProperties().getDeliveryTag();  
        try {  
            // 1. 接收消息  
            System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),  
                    message.getMessageProperties().getDeliveryTag());  
            // 2. 处理业务逻辑  
            System.out.println("处理业务逻辑");  
            // 手动设置一个异常,来测试异常拒绝机制  
            int num = 3 / 0;  
            // 3. 手动签收  
            channel.basicAck(deliveryTag, true);  
        } catch (Exception e) {  
            // 4. 异常了就拒绝签收  
            // 第三个参数 requeue,是否重新发送,如果为 true,则会重发;若为 false,则直接丢弃  
            channel.basicNack(deliveryTag, true, true);  
        }  
    }  
}

运行结果:消费异常时不断重试,deliveryTag 从 1 递增

控制台日志

image.png|201
image.png|201

管理界面unacked 也变成了 1

image.png
image.png
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息确认机制
    • 手动确认原理
  • 手动确认方法
    • 肯定确认——basicAck
    • 否定确认——basicReject
    • 否定确认——basicNack
  • 代码示例
    • AcknowledgeMode. NONE
      • 1. 配置确认机制
      • 2. 发送消息
      • 3. 写消费端逻辑
      • 4. 运行程序
    • AcknowledgeMode. AUTO
      • 1. 配置确认机制
      • 2. 重新运行程序
    • AcknowledgeMode. MANUAL
      • 1. 配置确认机制
      • 2. 消费端手动确认逻辑
      • 3. 异常时拒绝签收
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档