前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RabbitMQ发布确认详解

RabbitMQ发布确认详解

作者头像
小熊学Java
发布于 2023-07-16 06:24:17
发布于 2023-07-16 06:24:17
24900
代码可运行
举报
文章被收录于专栏:全栈学习之路全栈学习之路
运行总次数:0
代码可运行

1、原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。 confirm 模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

2、发布确认策略

1、开启发布确认

发布者确认是 AMQP 0.9.1协议的RabbitMQ扩展,因此默认情况下不启用它们。使用confirmSelect方法在通道级别启用发布者确认

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Channel channel = connection.createChannel();
channel.confirmSelect();

2、单独发布消息(Publishing Messages Individually)

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。 这种确认方式有一个最大的缺点就是:**发布速度特别的慢,**因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class PublishingMessagesIndividually {

    public static void main(String[] args) {
        try(Channel channel = RabbitMQConfig.getChannel()) {
            //随机生成队列
            String queryName = UUID.randomUUID().toString();
            //生成队列 不持久化 不共享 不删除 参数为空
            channel.queueDeclare(queryName, false, false, false, null);
            //开启发布确认
            channel.confirmSelect();
            long startTime = System.nanoTime();
            //消息数量为10
            for (int i = 0; i < 10; i++) {
                String message = i + "";
                channel.basicPublish("", queryName, null, message.getBytes());
                if (channel.waitForConfirms()) {
                    System.out.println("消息发送成功");

                }
            }
            long endTime = System.nanoTime();
            System.out.println("发布10" + "个单独确认消息,耗时" + (endTime - startTime) +
                    "ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
消息发送成功
消息发送成功
消息发送成功
消息发送成功
消息发送成功
消息发送成功
消息发送成功
消息发送成功
消息发送成功
消息发送成功
发布10个单独确认消息,耗时18288200ms

3、批量确认发布(Publishing Messages in Batches)

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class BatchesPublishingMessages {

    public static void main(String[] args) {
        try(Channel channel = RabbitMQConfig.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //开启发布确认
            channel.confirmSelect();
            //批量确认消息大小
            int msgSize = 100;
            //初始化未确认消息个数
            int outstandingMessageCount = 0;
            long start = System.currentTimeMillis();
            for (int i = 0; i < 10; i++) {
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount == msgSize) {
                    channel.waitForConfirms();
                    outstandingMessageCount = 0;
                }
                //为了确保还有剩余没有确认消息 再次确认
                if (outstandingMessageCount > 0) {
                    channel.waitForConfirms();
                }

            }
            long end = System.currentTimeMillis();
            System.out.println("发布10"  + "个批量确认消息,耗时" + (end - start) +
                    "ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
发布10个批量确认消息,耗时6ms

4、异步发布确认(Handling Publisher Confirms Asynchronously)

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的?

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class AsyncHandlingPublisherConfirms {

    public static void main(String[] args) throws Exception {
        asyncPublishMessage();
    }

    public static void asyncPublishMessage() throws Exception{
        try(Channel channel = RabbitMQConfig.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //开启发布确认
            channel.confirmSelect();
            /**
             * 线程安全有序的一个哈希表,适用于高并发的情况
             * 1.轻松的将序号与消息进行关联
             * 2.轻松批量删除条目 只要给到序列号
             * 3.支持并发访问
             */
            ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
            ConfirmCallback callback = (sequenceNumber, multiple) ->{
                if (multiple){
                    //返回的是小于等于当前序列号的未确认消息 是一个 map
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
                            sequenceNumber, true
                    );
                    //清除该部分未确认消息
                    confirmed.clear();
                }else {
                    //只清除当前序列号的消息
                    outstandingConfirms.remove(sequenceNumber);
                }
            };
            ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                String message = outstandingConfirms.get(sequenceNumber);
                System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
            };
            //接收消息回调 未接收消息回调
            channel.addConfirmListener(callback, null);
            long begin = System.currentTimeMillis();
            for (int i = 0; i < 10; i++) {
                String message = "消息" + i;
                /**
                 * channel.getNextPublishSeqNo()获取下一个消息的序列号
                 * 通过序列号与消息体进行一个关联
                 * 全部都是未确认的消息体
                 */
                outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", queueName, null, message.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println("发布10" + "个异步确认消息,耗时" + (end - begin) +
                    "ms");
        }
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
发布10个异步确认消息,耗时4ms

5、如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

6、对比

类型

优点

缺点

单独发布确认

同步等待确认,简单

吞吐量有限,100左右

批量发布确认

批量同步等待确认,简单,合理的吞吐量

一旦出现问题但很难推断出是那条消息出现了问题

异步发布确认

最佳性能和资源使用,在出现错误的情况下可以很好地控制

实现起来稍微难些

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1000个消息的执行时间对比
发布1000个单独确认消息,耗时561ms
发布1000个批量确认消息,耗时558ms
发布1000个异步确认消息,耗时42ms

合集列表:

  1. RabbitMQ 入门教程Hello World
  2. RabbitMQ的工作队列
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-09-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小熊学Java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RibbitMQ学习笔记之MQ发布确认
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,**所有在该信道上面发布的消息都将会被指派一个唯一的 ID( 从 1 开始),**一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
默 语
2024/11/20
790
RibbitMQ学习笔记之MQ发布确认
RabbitMQ 学习(七)----发布确认
  保证生产者将信息成功的发送到 RabbitMQ的 server端了,那么broker就会回一个确认,如果没有收到或者收到拒绝信息,那么说明可能网络不好没有发送成功,server端宕机了,broker拒绝接收等情况,如果不进行后续处理,那么信息就会丢失,生产者收到失败的消息使用回调函数在进行处理。
RAIN7
2022/10/07
5150
RabbitMQ 学习(七)----发布确认
rabbitmq如何保证消息可靠性不丢失
因为MQ整个消息周期设计到上述的三个角色,所以我们从这个三个角色开始讨论丢失数据的情况。并如何解决
啵啵肠
2023/11/28
2210
爆肝3万字,为你吃透RabbitMQ,最详细的RabbitMQ讲解(VIP典藏版)
早在之前就了解到了消息中间件,但是一直没有系统的学习,最近花了一段时间系统学习了当下最为主流的 RabbitMQ 消息队列,学习过程中也随时记录,刚开始学习的时候懵懵懂懂,做的笔记都比较杂乱,系统学习完后我将笔记内容不断反复修改,对章节进行设计调整,最终整合出了以下好理解、案例多、超详细的 RabbitMQ 学习笔记,希望能帮到大家~
SmallRoll小卷
2023/03/05
12.6K0
爆肝3万字,为你吃透RabbitMQ,最详细的RabbitMQ讲解(VIP典藏版)
【RabbitMQ】发布确认机制的具体实现
发送确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息
椰椰椰耶
2025/05/15
850
【RabbitMQ】发布确认机制的具体实现
RabbitMQ事务和Confirm发送方消息确认——深入解读
根据前面的知识(深入了解RabbitMQ工作原理及简单使用、Rabbit的几种工作模式介绍与实践)我们知道,如果要保证消息的可靠性,需要对消息进行持久化处理,然而消息持久化除了需要代码的设置之外,还有一个重要步骤是至关重要的,那就是保证你的消息顺利进入Broker(代理服务器),如图所示:
磊哥
2018/08/02
1.4K0
RabbitMQ事务和Confirm发送方消息确认——深入解读
MQ发布确认
  生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
一个风轻云淡
2022/11/13
1.3K0
MQ发布确认
RabbitMQ发布确认
  生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。   confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
别团等shy哥发育
2023/02/25
7270
RabbitMQ发布确认
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
RabbitMQ中的消息确认机制是指生产者发送消息后,等待消费者确认消息已经被正确接收和处理的一种机制。消息确认机制的主要目的是确保消息的可靠传递和处理,以避免消息丢失或重复处理的情况发生。
GeekLiHua
2025/01/21
2020
RabbitMQ异步发布确认
RabbitMQ的异步发布确认(Asynchronous Publish Confirm)是一种机制,用于在消息发送过程中异步地接收确认回调,以提高生产者的吞吐量和性能。通过使用异步发布确认,生产者可以在消息发送的同时继续执行其他操作,而不需要等待每条消息的确认回调。
堕落飞鸟
2023/05/16
7590
RabbitMQ之消息应答与发布确认
生产者发布消息到 RabbitMQ 后,需要 RabbitMQ 返回「ACK(已收到)」给生产者,这样生产者才知道自己生产的消息成功发布出去。
shaoshaossm
2022/12/27
5810
RabbitMQ之消息应答与发布确认
RabbitMQ 消息确认机制
消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!
海向
2019/09/23
1.2K0
RabbitMQ 消息确认机制
【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战
上篇文章介绍了rabbitmq的基本知识、交换机类型实战《【消息队列之rabbitmq】学习RabbitMQ必备品之一》 这篇文章主要围绕着消息确认机制为中心,展开实战;接触过消息中间件的伙伴都知道,消息会存在以下问题: 1、消息丢失问题和可靠性投递问题; 2、消息如何保证顺序消费; 3、消息如何保证幂等性问题,即重复消费问题等等… 本文主要以Rabbitmq消息中间件解决问题一的实践,其他问题小编会重新写文章总结;
沁溪源
2022/01/13
1.3K0
【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战
RabbitMq可靠性分析
最近了解并简单实用了下Rabbitmq,整个使用也大致了解了,但是要作做到真正的可靠,仅仅依赖于应用提供的方式是否在业务环境中真的能够达到可靠的目的。当然我们所谓的可靠性主要指的以下几方面(个人认为):
sucl
2019/08/07
4090
RabbitMQ学习
接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。
晓果冻
2022/09/08
3110
RabbitMQ学习
RabbitMQ生产者Confirm消息(三)
RabbitMQ的特性是保障数据的一致性,稳定性和可靠性。但是如何来保障这些了?这就有了很多的保障机制。在前面的文章体系中也是介绍到RabbitMQ中的生产者负责把消息发送到Exchange,并不需要关心Queue是什么,那么问题就出现了,如果生产者发送的MQ消息消费者没有收到了?这如何可以做到前面说的数据的一致性以及可靠性了。我们可以结合现实的例子来看这部分,比如我向别人借了100元,然后我要了对方的银行卡号,把钱还给了对方,但是我给对方没有说,那么其实对方是不知道的,所以在对方的心理我始终还是欠他100元的,其实这样的案例在我实际的生活就出现过,当然是很多年前的事了,总是这过程确认反馈的机制。技术也是需要符合人性的,那么RabbitMQ为了做到数据的一致性的保障,在生产者端就有Confirm的确认机制。
无涯WuYa
2022/03/29
9210
RabbitMQ生产者Confirm消息(三)
RabbitMQ
​ MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
OY
2022/03/21
1.8K0
RabbitMQ
消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式
消息队列RabbitMQ提供了六种工作模式:简单模式、work queues、发布订阅模式、路由模式、主题模式、发布确认模式。本文将介绍前三种工作模式。所有的案例代码都是使用Java语言实现。
百思不得小赵
2022/12/07
5880
消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式
RabbitMQ---消息队列---上半部分
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
大忽悠爱学习
2021/12/07
1.1K0
RabbitMQ---消息队列---上半部分
RabbitMQ学习之消息可靠性及特性
转载自 https://blog.csdn.net/zhu_tianwei/article/details/53971296
allsmallpig
2021/02/25
5970
相关推荐
RibbitMQ学习笔记之MQ发布确认
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验