首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【RabbitMQ】高级特性—事务、消息分发详解

【RabbitMQ】高级特性—事务、消息分发详解

作者头像
椰椰椰耶
发布2025-08-16 09:50:21
发布2025-08-16 09:50:21
13700
代码可运行
举报
文章被收录于专栏:学习学习
运行总次数:0
代码可运行

事务

RabbitMQ 是基于 AMQP 协议实现的,该协议实现了事务机制,因此 RabbitMQ 也支持事务机制。Spring AMQP 也提供了对事务相关的操作,RabbitMQ 事务允许开发者确保消息的发送和接收是原子性的,要么全部成功,要么全部失败

1. 配置事务管理器

代码语言:javascript
代码运行次数:0
运行
复制
@Configuration  
public class TransactionConfig {  
  
    @Bean  
    public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {  
        return new RabbitTransactionManager(connectionFactory);  
    }  
  
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {  
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  
        rabbitTemplate.setChannelTransacted(true);  
        return rabbitTemplate;  
    }  
}

2. 声明队列

代码语言:javascript
代码运行次数:0
运行
复制
@Bean("transQueue")  
public Queue transQueue() {  
    return QueueBuilder.durable("trans_queue").build();  
}

3. 生产者

代码语言:javascript
代码运行次数:0
运行
复制
@RestController  
@RequestMapping("/trans")  
public class TransactionProducer {  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
  
    @Transactional  
    @RequestMapping("/send")  
    public String send() {  
        rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");  
        int a = 50 / 0;  
        rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");  
        return "发送成功";  
    }  
}

4. 测试

  1. 不加 @Transactional,会发现消息 1 发送成功
  2. 添加 @Transactional,消息 1 和消息 2 全部发送失败

消息分发

概念

RabbitMQ 队列拥有多个消费者时,队列会把收到的消息分派给不同的消费者。每条消息只会发送给订阅列表的一个消费者。这种方式非常适合扩展,如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可

默认情况下 RabbitMQ 是以轮询的方法进行分发的,而不管消费者是否已经消费并已经确认了消息。这种方式是不太合理的,试想一下,如果某些消费者消费速度慢,而某些消费者速度很快,就可能会导致某些消费者消息积压,某些消费者空闲,进而应用整体的吞吐量下降


如何处理呢?

  • 我们可以使用前面说到的 channel.basicQos(int prefetchCount) 方法,来限制当前信道上的消费者所能保持的最大未确认消息的数量

比如:消费端调用了 channelbasicQos(5)RabbitMQ 会为该消费者计数,发送一条消息消息计数 +1,消费一条消息计数 -1,当达到了设定的上限,RabbitMQ 就不会再向它发送消息了,直到消费者确认了某条消息

  • 类似 TCP/IP 中的滑动窗口

应用场景

消息分发的常见场景有如下:

  1. 限流
  2. 非公平分发
限流

如下使用场景: 订单系统每秒最多处理 5000 请求,正常情况下,订单系统可以正常满足需求,但是在秒杀时间点,请求瞬间增多,每秒 1万 个请求,如果这些请求全部通过 MQ 发送到订单系统,无疑会把订单系统压垮

image.png
image.png
  • RabbitMQ 提供了限流机制,可以控制消费端一次只拉取 N 个请求
  • 通过设置 prefetchCount 参数,同时也必须要设置消息应答方式为手动应答
  • prefetchCount:控制消费者从队列中预取(prefetch)消息的数量,以此来实现流量控制和负载均衡
代码示例:
  1. 配置 prefetch 参数,设置应答方式为手动应答
代码语言:javascript
代码运行次数:0
运行
复制
listener:  
  simple:  
    acknowledge-mode: manual  # 消息接收确认   
    prefetch: 5
  1. 声明队列和交换机
代码语言:javascript
代码运行次数:0
运行
复制
// 消息分发——限流  
public static final String QOS_EXCHANGE_NAME = "qos_exchange";  
public static final String QOS_QUEUE = "qos_queue";
  1. 配置交换机,队列
代码语言:javascript
代码运行次数:0
运行
复制
@Configuration  
public class QosConfig {  
    // 1. 交换机  
    @Bean("qosExchange")  
    public Exchange qosExchange() {  
        return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE_NAME).durable(true).build();  
    }  
  
    // 2. 队列  
    @Bean("qosQueue")  
    public Queue qosQueue() {  
        return QueueBuilder.durable(Constant.QOS_QUEUE).build();  
    }  
  
    // 3. 队列和交换机绑定  
    @Bean("qosBinding")  
    public Binding qosBinding(@Qualifier("qosExchange") Exchange exchange, @Qualifier("qosQueue") Queue queue) {  
        return BindingBuilder.bind(queue).to(exchange).with("qos").noargs();  
    }  
}
  1. 发送消息,一次发送 20 条消息
代码语言:javascript
代码运行次数:0
运行
复制
@RequestMapping("/qos")  
public String qos() {  
    // 发送消息  
    for (int i = 0; i < 20; i++) {  
        rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE_NAME,"qos", "qos test..." + i);  
    }  
    return "发送成功";  
}
  1. 消费者监听
代码语言:javascript
代码运行次数:0
运行
复制
@Component  
public class QosQueueListener {  
    // 指定监听队列的名称  
    @RabbitListener(queues = Constant.QOS_QUEUE)  
    public void ListenerQueue(Message message, Channel channel) throws Exception{  
        long deliveryTag = message.getMessageProperties().getDeliveryTag();  
        System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);  
        // 3. 手动签收  
        //channel.basicAck(deliveryTag, true);  
    }  
}
  1. 测试

调用接口,发送消息:

  • 发送消息时,需要先把手动确认注释掉,不然会直接消费掉
  • 可以看到,控制台只打印了 5 条信息
image.png|359
image.png|359

我们观察管理平台

image.png
image.png
  • 可以看到,ready,也就是待发送 15 条,未确认的 5 条(因为代码未手动 ack

把配置里面的 prefetch: 5 注掉,然后在观察运行结果。

  • 从日志和控制台上可以看到:消费者会一次性把 20 条消息全部收到
image.png
image.png

管理平台:

image.png
image.png
负载均衡

我们也可以用此配置,来实现“负载均衡”

如下图所示,在有两个消费者的情况下,一个消费者处理任务非常快,另一个非常慢,就会造成一个消费老会一直很忙,而另一个消费者很闲

  • 这是因为 RabbitMQ 只是在消息进入队列时分派消息,他不考虑消费者未确认消息的数量
image.png
image.png

我们可以使用设置 prefetch=1 的方式,告诉 RabbitMQ 一次只给一个消费者一条信息,也就是说,在处理并确认前一条消息之前,不要向该消费者发送新消息。相反,它会将它分派给下一个不忙的消费者

代码示例
  1. 配置 prefetch 参数,设置应答方式为手动应答
代码语言:javascript
代码运行次数:0
运行
复制
listener:  
  simple:  
    acknowledge-mode: manual  # 消息接收确认   
    prefetch: 1
  1. 启动两个消费者
    • 使用 Thread.sleep(100) 来模拟消费满
代码语言:javascript
代码运行次数:0
运行
复制
@Component  
public class QosQueueListener {  
    // 指定监听队列的名称  
    @RabbitListener(queues = Constant.QOS_QUEUE)  
    public void ListenerQueue(Message message, Channel channel) throws Exception{  
        long deliveryTag = message.getMessageProperties().getDeliveryTag();  
        System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);  
        // 3. 手动签收  
        channel.basicAck(deliveryTag, true);  
    }  
  
    // 指定监听队列的名称  
    @RabbitListener(queues = Constant.QOS_QUEUE)  
    public void ListenerQueue2(Message message, Channel channel) throws Exception {  
        long deliveryTag = message.getMessageProperties().getDeliveryTag();  
        System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), deliveryTag);  
        // 模拟处理流程慢  
        Thread.sleep(100);  
        // 手动签收  
        channel.basicAck(deliveryTag, true);  
    }  
}
  1. 测试

调用接口,发送消息

  • 通过日志观察两个消费者消费的消息
image.png|334
image.png|334
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-08-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 事务
    • 1. 配置事务管理器
    • 2. 声明队列
    • 3. 生产者
    • 4. 测试
  • 消息分发
    • 概念
    • 应用场景
      • 限流
      • 负载均衡
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档