前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >12-RabbitMQ高级特性-Consumer ACK

12-RabbitMQ高级特性-Consumer ACK

作者头像
Devops海洋的渔夫
发布于 2022-11-22 01:56:25
发布于 2022-11-22 01:56:25
39800
代码可运行
举报
文章被收录于专栏:Devops专栏Devops专栏
运行总次数:0
代码可运行

12-RabbitMQ高级特性-Consumer ACK

Consumer Ack

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

“其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。 ”

案例

1. 创建工程

创建一个空的 maven 工程 rabbitmq-consumer-spring:

2. 添加依赖

修改pom.xml文件内容为如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lijw</groupId>
    <artifactId>rabbitmq-consumer-spring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.7.RELEASE</version>
        </dependency>
    </dependencies>
</project>

3. 配置整合

1.创建rabbitmq.properties连接参数等配置文件;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=libai
rabbitmq.password=libai
rabbitmq.virtual-host=/test

2.创建 spring-rabbitmq-consumer.xml 整合配置文件;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       https://www.springframework.org/schema/context/spring-context.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    <!--加载配置文件-->
    <context:property-placeholder location="classpath:rabbitmq.properties"/>

    <!-- 定义rabbitmq connectionFactory -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                               port="${rabbitmq.port}"
                               username="${rabbitmq.username}"
                               password="${rabbitmq.password}"
                               virtual-host="${rabbitmq.virtual-host}"/>


</beans>
  1. 配置扫描监听器的Bean

后续我们写的消费者都是监听接口的实现类,需要将其生成 Bean,所以我们可以将其写到一个 包下,配置 spring 扫描:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<!-- 定义扫描监听器类   -->
<context:component-scan base-package="com.lijw.listener" />

4. 编写监听器

实现 MessageListener 类,则可以接收到消息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.lijw.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
 * @author Aron.li
 * @date 2022/3/4 23:36
 */
@Component
public class AckListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        String msg = new String(message.getBody());
        System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s \n",
                message.getMessageProperties().getReceivedExchange(),
                message.getMessageProperties().getReceivedRoutingKey(),
                message.getMessageProperties().getConsumerQueue(),
                msg);
    }
}

5.配置 监听器类 与 队列 绑定

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<!--  定义监听器与队列的绑定  -->
<rabbit:listener-container connection-factory="connectionFactory" >
    <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
</rabbit:listener-container>

这里的队列是上一篇中的生产者发送消息的队列 test_queue_confirm

6.测试 接收消息

配置好之后,下面我们来启动服务,验证一下能否接收到消息。

需要我们编写一个测试类,启动加载 spring 的相关配置:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.lijw;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @author Aron.li
 * @date 2022/3/4 23:41
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test01() {
        while (true) {

        }
    }

}

执行测试方法,开启加载 spring 框架,接收消息如下:

说明现在监听器已经正常工作了,那么下一步我们就要开始来写 Consumer Ack 的功能了。

7. 配置消息接收 手动确认:acknowledge="manual"

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<!--  定义监听器与队列的绑定  -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"/>
</rabbit:listener-container>

8.改写监听器,实现 ChannelAwareMessageListener 接口

在消息手动确认中,我们需要使用 channel.basicAck() channel.basicNack() 进行确认,那么就需要有 channel 提供我们调用。

而原来的 MessageListener 接口没有提供 channel ,我们可以实现 MessageListener 的子接口 ChannelAwareMessageListener

改写监听器如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.lijw.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


/**
 * Consumer ACK机制:
 * 1. 设置手动签收。acknowledge="manual"
 * 2. 让监听器类实现ChannelAwareMessageListener接口
 * 3. 如果消息成功处理,则调用channel的 basicAck()签收
 * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
 *
 * @author Aron.li
 * @date 2022/3/4 23:36
 */
@Component
public class AckListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //1. 获取传递的标签,用于消息确认
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //2. 接收消息
            System.out.println(new String(message.getBody()));

            //3. 处理业务逻辑
            System.out.println("处理业务逻辑...");
//            int i = 3 / 0; // 产生异常
            //4. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //e.printStackTrace();

            //5. 拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag, true, true); // 一次性可以拒绝多条消息
            //channel.basicReject(deliveryTag,true); // 一次性只能拒绝一条消息,了解即可
        }
    }

}

9.测试 拒绝重发消息

首先我们正常启动监听器,并且生产者发送消息:

下面我们在处理业务逻辑的位置 编写一个异常代码,如下:

可以看到只要没有签收成功,就可以让消息不断重发,直到我们解决了异常,正常签收消息为止。

Consumer Ack 小结

  • 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
  • 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
  • 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

消息可靠性总结

  • 持久化
    • exchange要持久化
    • queue要持久化
    • message要持久化
  • 生产方确认Confirm
  • 消费方确认Ack
  • Broker高可用
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 海洋的渔夫 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
13-RabbitMQ高级特性-消费端限流
当用户的请求突然增多,MQ可以配置消费端限流,让消息按照限制的数量进行消费,达到限流的效果。
Devops海洋的渔夫
2022/11/22
4470
13-RabbitMQ高级特性-消费端限流
15-RabbitMQ高级特性-死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
Devops海洋的渔夫
2023/02/10
1K0
15-RabbitMQ高级特性-死信队列
16-RabbitMQ高级特性-延迟队列
16-RabbitMQ高级特性-延迟队列 延迟队列 延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。 需求: 下单后,30分钟未支付,取消订单,回滚库存。 新用户注册成功7天后,发送短信问候。 实现方式: 对于上面两种需求,一般有两种实现方式: 定时器 延迟队列 定时器:设置一个轮询时间,间隔一段时间对数据库进行扫描对比,当符合定时的数据则进行处理; 缺点: 不优雅,因为不管设置多少间隔时间,都会对数据库产生多次扫描的执行,影响性能; 而且间隔的时间范围对具体时间点存在一定的误差
Devops海洋的渔夫
2023/02/10
4560
16-RabbitMQ高级特性-延迟队列
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
Java廖志伟
2022/03/07
7660
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)
RabbitMQ高级特性:死信队列
死信队列,英文缩写DLX,Dead Letter Exchange(死信交换机),当消息成为Dead message(消息过期)后,可以被重新发送到另一个交换机,这个交换机就算是DLX,其实死信交换机(队列)和正常交换机(队列)没有什么区别
科技新语
2023/04/07
2.7K0
RabbitMQ实战代码
RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。
码客说
2019/10/21
4870
近九万字的RabbitMQ图文详解
RabbitMQ 简介:RabbitMQ 基于 AMQP 标准,采用 Erlang 语言开发的消息中间件。
Java廖志伟
2022/03/07
1.1K0
近九万字的RabbitMQ图文详解
java消息队列基础和RabbitMQ相关概念
在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式。
终有救赎
2023/10/16
2570
java消息队列基础和RabbitMQ相关概念
9-Spring 整合 RabbitMQ
创建一个空的 maven 工程 spring-rabbitmq-producer:
Devops海洋的渔夫
2022/11/22
3310
9-Spring 整合 RabbitMQ
【Rabbitmq篇】RabbitMQ⾼级特性----消息确认
前期讲了RabbitMQ的概念和应⽤,RabbitMQ实现了AMQP0-9-1规范的许多扩展,在RabbitMQ官⽹上,也给⼤家介绍了RabbitMQ的⼀些特性,我们挑⼀些重要的且常⽤的给⼤家讲⼀下
用户11369558
2024/11/20
5100
【Rabbitmq篇】RabbitMQ⾼级特性----消息确认
SpringACK对RabbitMQ消息的确认(消费)
之前已经简单介绍了基本是从发送方去确认的,我们需要在配置文件当中开启发送方确认模式,共育两种,一种是相对于交换机一个是相对于队列。
兰舟千帆
2022/08/11
6760
SpringACK对RabbitMQ消息的确认(消费)
11-RabbitMQ高级特性-消息可靠性投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
Devops海洋的渔夫
2022/11/22
3330
11-RabbitMQ高级特性-消息可靠性投递
RabbitMQ高级特性之消费端限流
假设我们现在有这么一个场景,我们的消费端由于某些原因导致全部宕机等不可用,导致RabbitMQ服务器队列中存储了大量消息未被消费掉,那么等恢复消费端服务器后,就会有巨大量的消息全部推送过来,但是我们单个客户端无法同事处理这么多消息,就是导致消费端一些不可预测错误,甚至又会重复发生宕机,所以在实际业务场景中,限流保护还是非常重要的。
黎明大大
2021/03/08
7800
springboot整合rabbitmq,动态创建queue和监听queue
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/146685.html原文链接:https://javaforall.cn
全栈程序员站长
2022/08/27
2.3K0
RabbitMQ使用教程(超详细)
下载地址:http://www.rabbitmq.com/download.html
全栈程序员站长
2022/07/21
2.6K0
RabbitMQ使用教程(超详细)
20-SpringBoot整合RabbitMQ
你已经是一个长大的IDEA了, 要学会自己新建工程, 然后IDEA自己创建了rabbitmq-consumer和rabbitmq-producer工程
彼岸舞
2022/10/06
3590
20-SpringBoot整合RabbitMQ
rabbitmq系统学习(二)
Rabbitmq高级整合应用 RabbitMq整合Spring AMQP实战 RabbitAdmin 使用RabbitTemplate的execute方法执行对应操作 rabbitAdmin.declareExchange()//声明 rabbitAdmin.declareQueue() rabbitAdmin.declareBinding() rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.topic.queue",false
老梁
2019/09/10
7750
SpringBoot整合RabbitMQ消息队列-学习笔记 原
本篇文章主要用于记录个人学习RabbitMQ的过程,记录下来方便日后查看,如有错误的地方,还望指正。
拓荒者
2019/09/09
9180
SpringBoot整合RabbitMQ消息队列-学习笔记
                                                                                                    原
14. Springboot集成RabbitMQ
消息队列(Message Queue,简称 MQ)是一种异步的消息传递中间件,它解耦了应用程序之间的通信。应用程序可以将消息发送到队列,而无需知道谁会接收这些消息。接收应用程序可以从队列中检索消息,而无需知道谁发送了这些消息。消息队列是一种重要的中间件,它可以帮助应用程序之间进行异步、可靠、可扩展的通信。常见的消息队列中间件有ActiveMQ,RabbitMQ,Kafka......今天我们就来介绍RabbitMQ。
有一只柴犬
2024/03/28
2160
14. Springboot集成RabbitMQ
消息中间件之Rabbitmq
1、https://www.kancloud.cn/yunxifd/rabbitmq/96997
爱撒谎的男孩
2019/12/31
9480
相关推荐
13-RabbitMQ高级特性-消费端限流
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验