大家好,我是工藤学编程 🦉 | 一个正在努力学习的小博主,期待你的关注 |
|---|---|
实战代码系列最新文章😉 | C++实现图书管理系统(Qt C++ GUI界面版) |
SpringBoot实战系列🐷 | 【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案 |
分库分表 | 分库分表之实战-sharding-JDBC分库分表执行流程原理剖析 |
消息队列 | 深入浅出 RabbitMQ-简单队列实战 |
前情摘要:
1、深入浅出 RabbitMQ-核心概念介绍与容器化部署 2、深入浅出 RabbitMQ-简单队列实战
【亲测宝藏】发现一个让 AI 学习秒变轻松的神站!不用啃高数、不用怕编程,高中生都能看懂的人工智能教程来啦!
👉点击跳转,和 thousands of 小伙伴一起用快乐学习法征服 AI,说不定下一个开发出爆款 AI 程序的就是你!
在实际业务中,当消息生产速度超过消费速度时(比如秒杀场景的订单消息、日志采集的大量数据),单消费者可能会导致消息堆积。工作队列(Work Queue) 通过引入多个消费者共同处理同一队列的消息,实现消息的分布式消费,解决"生产快、消费慢"的问题。
工作队列的核心特点:
RabbitMQ默认的消息分配方式:将消息依次轮流发送给每个消费者,不考虑消费者的处理速度。例如10条消息会被均匀分给2个消费者(各处理5条)。
发送10条消息到队列work_mq_rr,模拟高频率生产场景:
public class Send {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
// 1. 配置连接工厂(同简单队列,确保虚拟主机一致)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.229.128");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev"); // 生产者与消费者需使用同一虚拟主机
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 2. 声明队列(非持久化,非独占,不自动删除)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 3. 循环发送10条消息
for (int i = 0; i < 10; i++) {
String message = "Hello World! " + i;
// 发送消息(使用默认交换机,路由键=队列名)
channel.basicPublish("", QUEUE_NAME, null,
message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] 发送消息: '" + message + "'");
}
}
}
}两个消费者监听同一队列,模拟不同处理速度(此处为简化,暂用相同休眠时间,实际可调整差异):
// 消费者1
public class Recv1 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.229.128");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev"); // 注意:与生产者保持一致(原代码误写为/xdclass1,已修正)
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Recv1 等待消息...(按CTRL+C退出)");
// 消息处理回调(模拟处理耗时:3秒)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
TimeUnit.SECONDS.sleep(3); // 模拟业务处理时间
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Recv1 接收: '" + message + "'");
// 手动确认消息(必须,否则消息会一直存在队列中)
// 参数2:false表示仅确认当前消息,true表示确认所有小于当前tag的消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 关闭自动确认(autoAck=false),开启手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}// 消费者2(代码与Recv1一致,仅打印标识不同)
public class Recv2 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] argv) throws Exception {
// 连接配置与Recv1相同
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.229.128");
factory.setUsername("admin");
factory.setPassword("password");
factory.setVirtualHost("/dev");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Recv2 等待消息...(按CTRL+C退出)");
// 消息处理回调(同样模拟3秒耗时)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Recv2 接收: '" + message + "'");
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}


实际结果


当消费者处理能力不均时(比如Recv1处理需1秒,Recv2需5秒),轮训策略会导致:
为解决轮训的负载不均问题,公平策略让消费者处理完一条消息后,再接收下一条,实现"能者多劳"。核心配置:通过channel.basicQos(1)设置"预取数"为1,告诉RabbitMQ:“在我处理完当前消息并确认前,不要给我发新消息”。
在消费者的channel.queueDeclare之后,添加basicQos配置:
// 消费者1(Recv1)添加公平策略配置
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 关键:设置预取数为1,开启公平策略
channel.basicQos(1); // 每次只接收1条未确认的消息
// 后续的消息处理逻辑不变...// 消费者2(Recv2)同样添加:
channel.basicQos(1);修改Recv1的处理耗时为1秒(快消费者),Recv2仍为3秒(慢消费者):
// Recv1的处理耗时改为1秒
TimeUnit.SECONDS.sleep(1); 

维度 | 轮训策略(Round Robin) | 公平策略(Fair Dispatch) |
|---|---|---|
核心逻辑 | 依次轮流分配,不考虑处理速度 | 处理完一条再分配下一条,按能力分配 |
配置要点 | 默认生效,无需额外配置 | 需设置channel.basicQos(1) |
适用场景 | 所有消费者处理速度相近的场景 | 消费者处理速度差异较大的场景 |
消息确认 | 必须开启手动确认(autoAck=false) | 必须开启手动确认(autoAck=false) |
basicQos,必须关闭自动确认(autoAck=false),并在处理完后调用basicAck确认,否则消息会一直堆积。basicQos(n)的n可根据实际场景调整(如n=5表示允许预取5条未确认消息),避免网络频繁交互。basicQos)channel.basicQos(1)工作队列通过多消费者分布式消费,解决了"生产快于消费"的问题,而策略的选择直接影响效率:
basicQos(1)实现负载均衡,适合消费者能力差异大的场景。实际项目中,需根据消费者处理能力、消息紧急程度等因素选择策略,必要时结合消息优先级、持久化等特性,进一步优化消息处理链路。
觉得有用请点赞收藏! 如果有相关问题,欢迎评论区留言讨论~