前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【消息队列】RabbitMQ如何保障消息可靠性投递

【消息队列】RabbitMQ如何保障消息可靠性投递

作者头像
千羽
发布2024-06-05 18:32:54
1140
发布2024-06-05 18:32:54
举报
文章被收录于专栏:程序员千羽程序员千羽

hello,大家好,我是千羽。

消息队列的八股文最喜欢问这个。消息可靠性投递,这个对于消息队列非常重要,很多时候,我们不是人为的去干预,但是仍然出现其他意外的事情。

导致消息在传递过程中可能会面临丢失、重复、损坏等问题,这就要求我们必须重视消息的可靠性投递。

什么是消息可靠性投递?

消息可靠性投递的目标是确保消息能够从生产者(Producer)可靠地传递到消费者(Consumer),并且在传递过程中不丢失、不重复、不损坏。实现这一目标需要采取多种技术手段和策略。

下面列举三个栗子:

故障情况1

故障情况1:消息没有发送到消息队列上。导致消费者拿不到消息,业务功能缺失,数据错误

  • 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
  • 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机

故障情况2

故障情况2:消息成功存入消息队列,但是消息队列服务器宕机了。原本保存在内存中的消息也丢失了。即使服务器重新启动,消息也找不回来了

  • 导致消费者拿不到消息,业务功能缺失,数据错误
  • 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失

故障情况3

故障情况3:消息成功存入消息队列,但是消费端出现问题,例如:宕机、抛异常等等

  • 导致业务功能缺失,数据错误

解决思路

  1. 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
  2. 消费端消费消息失败,给服务器端返回NACK信息。
  3. 同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)

故障情况1:消息没有发送到消息队列上

1. 生产者代码demo演示

1.1 配置POM

代码语言:javascript
复制
 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.nateshao</groupId>
    <artifactId>code7_confirm_consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>code7_confirm_consumer</name>
    <description>code7_confirm_consumer</description>
    <properties>
        <java.version>17</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

1.2 主启动类

没有特殊设定:

代码语言:javascript
复制
package com.nateshao.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * @Author 千羽
 * @公众号 程序员千羽
 * @Date 2024/5/29 16:00
 * @Version 1.0
 */
@SpringBootApplication
public class RabbitMQConsumerMainType {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMQConsumerMainType.class, args);
    }
}

1.3 YAML

注意publisher-confirm-typepublisher-returns是两个必须要增加的配置,如果没有则本节功能不生效

代码语言:javascript
复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    publisher-confirm-type: CORRELATED # 交换机的确认
    publisher-returns: true # 队列的确认
logging:
  level:
    com.nateshao.producer.config.MQProducerAckConfig: info

2. 创建配置类

在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:

方法名

方法功能

所属接口

接口所属类

confirm()

确认消息是否发送到交换机

ConfirmCallback

RabbitTemplate

returnedMessage()

确认消息是否发送到队列

ReturnsCallback

RabbitTemplate

然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。

原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。

而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:

设置组件调用的方法

所需对象类型

setConfirmCallback()

ConfirmCallback接口类型

setReturnCallback()

ReturnCallback接口类型

2.1 API说明

①ConfirmCallback接口

这是RabbitTemplate内部的一个接口,源代码如下:

代码语言:javascript
复制
 /**
  * A callback for publisher confirmations.
  *
  */
 @FunctionalInterface
 public interface ConfirmCallback {

  /**
   * Confirmation callback.
   * @param correlationData correlation data for the callback.
   * @param ack true for ack, false for nack
   * @param cause An optional cause, for nack, when available, otherwise null.
   */
  void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);

 }

生产者端发送消息之后,回调confirm()方法

  • ack参数值为true:表示消息成功发送到了交换机
  • ack参数值为false:表示消息没有发送到交换机

②ReturnCallback接口

同样也RabbitTemplate内部的一个接口,源代码如下:

代码语言:javascript
复制
 /**
  * A callback for returned messages.
  *
  * @since 2.3
  */
 @FunctionalInterface
 public interface ReturnsCallback {

  /**
   * Returned message callback.
   * @param returned the returned message and metadata.
   */
  void returnedMessage(ReturnedMessage returned);

 }

注意:接口中的returnedMessage()方法在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名

类型

含义

message

org.springframework.amqp.core.Message

消息以及消息相关数据

replyCode

int

应答码,类似于HTTP响应状态码

replyText

String

应答码说明

exchange

String

交换机名称

routingKey

String

路由键名称

3. 配置类代码

3.1 要点1

加@Component注解,加入IOC容器

3.2 要点2

配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。

操作封装到了一个专门的void init()方法中。

为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。

关于@PostConstruct注解大家可以参照以下说明:

@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。 使用@PostConstruct注解的方法必须满足以下条件:

  1. 方法不能有任何参数。
  2. 方法必须是非静态的。
  3. 方法不能返回任何值。

当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。

3.3 生产者中的代码

有了以上说明,下面我们就可以展示配置类的整体代码:

代码语言:javascript
复制
package com.nateshao.producer.mq.config;

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

/**
 * @Author 千羽
 * @公众号 程序员千羽
 * @Date 2024/5/29 16:00
 * @Version 1.0
 */
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // 消息发送到交换机成功或失败时调用这个方法
        log.info("confirm() 回调函数打印 CorrelationData:" + correlationData);
        log.info("confirm() 回调函数打印 ack:" + ack);
        log.info("confirm() 回调函数打印 cause:" + cause);
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        // 发送到队列失败时才调用这个方法
        log.info("returnedMessage() 回调函数 消息主体: " + new String(returned.getMessage().getBody()));
        log.info("returnedMessage() 回调函数 应答码: " + returned.getReplyCode());
        log.info("returnedMessage() 回调函数 描述:" + returned.getReplyText());
        log.info("returnedMessage() 回调函数 消息使用的交换器 exchange : " + returned.getExchange());
        log.info("returnedMessage() 回调函数 消息使用的路由键 routing : " + returned.getRoutingKey());
    }
}

4.消费者代码演示

application.yml

代码语言:javascript
复制
spring:
  rabbitmq:
    host: localhost
    port: 15672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 把消息确认模式改为手动确认
        prefetch: 1 # 每次从队列中取回消息的数量

监听类:MyMessageListener.class

代码语言:javascript
复制
package com.nateshao.producer.mq.config;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @Author 千羽
 * @公众号 程序员千羽
 * @Date 2024/6/1 14:18
 * @Version 1.0
 */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送到交换机成功!数据:" + correlationData);
        } else {
            log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息主体: " + new String(returned.getMessage().getBody()));
        log.info("应答码: " + returned.getReplyCode());
        log.info("描述:" + returned.getReplyText());
        log.info("消息使用的交换器 exchange : " + returned.getExchange());
        log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
    }
}

5.验证效果

5.1 验证交换机错误

代码语言:javascript
复制
@Test
public void testExchangeDirectErrorSendMessage() {
    rabbitTemplate.convertAndSend(EXCHANGE_DIRECT + "000", ROUTING_KEY, "Message Test Confirm~~~ ~~~");
}

控制台输出,提示:没有找到该交换机no exchange 'exchange.direct.order~'

代码语言:javascript
复制
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange.direct.order000' in vhost '/', class-id=60, method-id=40)
confirm() 回调函数打印 CorrelationData:null
confirm() 回调函数打印 ack:false
confirm() 回调函数打印 cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange.direct.order000' in vhost '/', class-id=60, method-id=40)
  1. 2验证路由错误
代码语言:javascript
复制
/**
 * 验证路由地址写错
 */
@Test
public void testRoutingErrorMessage() {
    rabbitTemplate.convertAndSend(EXCHANGE_DIRECT, ROUTING_KEY + "routing ~", "Message Test Confirm~~~ ~~~");
}

控制台打印:发送交换机成功ack:true,但是路由失败NO_ROUTE

代码语言:javascript
复制
returnedMessage() 回调函数 消息主体: Message Test Confirm~~~ ~~~
returnedMessage() 回调函数 应答码: 312
confirm() 回调函数打印 CorrelationData:null
confirm() 回调函数打印 ack:true
confirm() 回调函数打印 cause:null
returnedMessage() 回调函数 描述:NO_ROUTE
returnedMessage() 回调函数 消息使用的交换器 exchange : exchange.direct.order
returnedMessage() 回调函数 消息使用的路由键 routing : queue.orderrouting ~

6. 完整代码

代码语言:javascript
复制
@SpringBootTest(classes = RabbitMQProducerMainType.class)
public class RabbitMQTest {  
  
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;
  
    @Test
    public void testSendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT + "~", ROUTING_KEY , "Hello 千羽");
    }
  
}

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确
  • 交换机正确、路由键不正确,无法发送到队列
  • 交换机不正确,无法发送到交换机

故障情况2:备份交换机

故障情况2:消息成功存入消息队列,但是消息队列服务器宕机了。原本保存在内存中的消息也丢失了。即使服务器重新启动,消息也找不回来了

  • 导致消费者拿不到消息,业务功能缺失,数据错误
  • 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失

1. 创建备份交换机

1.1 创建备份交换机

注意:备份交换机一定要选择fanout类型,因为原交换机转入备份交换机时并不会指定路由键

1.2 创建备份交换机要绑定的队列

①创建队列

②绑定交换机

注意:这里是要和备份交换机绑定

1.3 针对备份队列创建消费端监听器

代码语言:javascript
复制
    public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
    public static final String QUEUE_NAME_BACKUP  = "queue.order.backup";

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),
            exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),
            key = {""}
    ))
    public void processMessageBackup(String dateString,
                                     Message message,
                                     Channel channel) {
        log.info("BackUp: " + dateString);
    }

2. 设定备份关系

2.1 原交换机删除

·

2.2 重新创建原交换机

2.3 原交换机重新绑定原队列

3.测试

  • 启动消费者端
  • 发送消息,但是路由键不对,于是转入备份交换机

实际上,这种备份交换机还是比较少的,一般常用的是采取ack的确认机制

故障情况3:交换机和队列持久化

rabbitmq默认是持久化的,这一点可以看一下底层的源码

通过点击监听器@RabbitListener,找到Queue[] queuesToDeclare() default {};

看到String durable() default "";看注释可以看到,默认情况下,如果你提供队列名称的话,默认就是持久化的。

1. 测试非持久化交换机和队列

1.1 创建非持久化交换机

创建之后,可以在列表中看到:

1.2 创建非持久化队列

创建之后,可以在列表中看到:

1.3 绑定

1.4 发送消息

代码语言:javascript
复制
    public static final String EXCHANGE_TRANSIENT = "exchange.transient.user";
    public static final String ROUTING_KEY_TRANSIENT = "user";

    @Test
    public void testSendMessageTransient() {
        rabbitTemplate.convertAndSend(EXCHANGE_TRANSIENT,ROUTING_KEY_TRANSIENT,"Hello 千羽 user~~~");
    }

1.5 查看已发送消息

结论:临时性的交换机和队列也能够接收消息,但如果RabbitMQ服务器重启之后会怎么样呢?

1.6 重启RabbitMQ服务器

代码语言:javascript
复制
docker restart rabbitmq

重启之后,刚才临时性的交换机和队列都没了。在交换机和队列这二者中,队列是消息存储的容器,队列没了,消息就也跟着没了。

2. 持久化的交换机和队列

我们其实不必专门创建持久化的交换机和队列,因为它们默认就是持久化的。接下来我们只需要确认一下:存放到队列中,尚未被消费端取走的消息,是否会随着RabbitMQ服务器重启而丢失?

2.1 发送消息

运行以前的发送消息方法即可,不过要关掉消费端程序

2.2 在管理界面查看消息

2.3 重启RabbitMQ服务器

代码语言:javascript
复制
docker restart rabbitmq

2.4 再次查看消息

仍然还在

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 千羽的编程时光 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 故障情况1:消息没有发送到消息队列上
    • 1. 生产者代码demo演示
      • 1.1 配置POM
        • 1.2 主启动类
          • 1.3 YAML
            • 2. 创建配置类
              • 2.1 API说明
                • ①ConfirmCallback接口
                • ②ReturnCallback接口
              • 3. 配置类代码
                • 3.1 要点1
                • 3.2 要点2
                • 3.3 生产者中的代码
              • 4.消费者代码演示
                • 5.验证效果
                  • 6. 完整代码
                  • 故障情况2:备份交换机
                  • 1. 创建备份交换机
                    • 1.1 创建备份交换机
                      • 1.2 创建备份交换机要绑定的队列
                        • ①创建队列
                        • ②绑定交换机
                      • 1.3 针对备份队列创建消费端监听器
                      • 2. 设定备份关系
                        • 2.1 原交换机删除
                          • 2.2 重新创建原交换机
                            • 2.3 原交换机重新绑定原队列
                              • 3.测试
                              • 故障情况3:交换机和队列持久化
                              • 1. 测试非持久化交换机和队列
                                • 1.1 创建非持久化交换机
                                  • 1.2 创建非持久化队列
                                    • 1.3 绑定
                                      • 1.4 发送消息
                                        • 1.5 查看已发送消息
                                          • 1.6 重启RabbitMQ服务器
                                          • 2. 持久化的交换机和队列
                                            • 2.1 发送消息
                                              • 2.2 在管理界面查看消息
                                                • 2.3 重启RabbitMQ服务器
                                                  • 2.4 再次查看消息
                                                  相关产品与服务
                                                  消息队列
                                                  腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
                                                  领券
                                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档