

简单模式的增强版,和简单模式的区别就是:简单模式只有一个消费者,而工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收。
首先先把之前代码简化一下,把常量抽出来:
public class Constants {
public static final String IP = "127.0.0.1";
public static final int PORT = 5672;
public static final String VIRTUALHOST = "lirendada";
public static final String USERNAME = "liren";
public static final String PASSWORD = "123456";
public static final String WORK_QUEUE = "work.queue";
}为了能看到多个消费者竞争的关系,这里一次发送10条消息。生产者代码如下所示:
public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
// 2. 创建连接Connection和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明一个队列
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
// 4. 发送消息(当使用内置交换机的时候,routingKey必须和队列名称保持一致)
for(int i = 0; i < 10; ++i) {
String text = "hello workqueue " + i;
channel.basicPublish("", Constants.WORK_QUEUE, null, text.getBytes(StandardCharsets.UTF_8));
}
// 5. 释放资源
connection.close();
}
}消费者代码和简单模式一样,只是复制两份,两个消费者代码可以是一样的:
public class consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
// 2. 创建连接Connection和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明一个队列(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
// 4. 接收消息,进行消费💥
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, defaultConsumer);
}
}

RabbitMQ 交换机常见三种类型:fanout、direct、topic,不同类型有着不同的路由策略。
routing key 的队列(Routing 模式)routing pattern 的队列(Topics 模式)所以发布订阅模式使用了交换机的 Fanout 广播模式来完成!此时需要知道创建交换机,以及绑定交换机和队列的方法,如下所示:
channel.exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;参数 | 说明 | 典型取值/建议 |
|---|---|---|
exchange | 交换机名称 | 如 "my.direct.exchange"。同名交换机若存在,属性必须一致 |
type | 交换机类型 | DIRECT, FANOUT, TOPIC, HEADERS |
durable | 是否持久化 | true 表示 RabbitMQ 重启后交换机仍保留 |
autoDelete | 是否自动删除 | 当没有队列绑定且没有连接使用时自动删除 |
internal | 是否内部交换机 | 若为 true,客户端不能直接发布消息到该交换机,通常用于交换机间路由 |
arguments | 扩展参数 | 例如备用交换机、延迟特性等 |
channel.queueBind(String queue,
String exchange,
String routingKey) throws IOException;参数名 | 作用 | 说明 |
|---|---|---|
queue | 队列名称 | 要绑定的队列名(必须已经声明过 queueDeclare()) |
exchange | 交换机名称 | 要绑定到的交换机(必须已声明过 exchangeDeclare()) |
routingKey | 路由键 | 决定消息如何路由到该队列 |
下面是需要用到的常量:
// 发布订阅模式
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";因为广播模式就是要推送消息给所有绑定当前交换机的队列,所以绑定队列和交换机的时候,只需要设置 routing key 为空字符串即可。
生产者代码:
public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
// 2. 创建连接Connection和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明两个队列
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
// 4. 创建交换机
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
// 5. 绑定交换机和队列(因为是广播模式,所以本质不需要routingkey,置为空字符串即可)
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
// 6. 发送消息
String text = "hello public/subscribe && fanout mode!";
channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, text.getBytes(StandardCharsets.UTF_8));
// 7. 释放资源
connection.close();
}
}消费者代码:(有两份,都是一样的,这里只放一份)
public class consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
// 2. 创建连接Connection和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明队列(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
// 4. 接收消息,进行消费💥
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.FANOUT_QUEUE1, true, defaultConsumer);
}
}
RabbitMQ 交换机常见三种类型:fanout、direct、topic,不同类型有着不同的路由策略。
routing key 的队列(Routing 模式)routing pattern 的队列(Topics 模式)路由模式采用的是 RabbitMQ 中的 Direct 定向策略,生产者发送消息的时候,交换机需要根据消息中的 Routing Key 将消息发送给指定的队列,而不是发给每一个队列了!
此时,队列和交换机的绑定,不能是任意的绑定了,而是要指定一个 Binding Key。
只有队列绑定时的 Binding Key 和消息中的 Routing Key 完全一致,队列才会接收到消息。

和发布订阅模式的区别是:交换机类型不同、绑定队列的 Binding Key 不同。
下面是需要用到的常量:
// 路由模式
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";生产者代码:
public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
// 2. 创建连接Connection和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明两个队列
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
// 4. 声明交换机
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
// 5. 绑定交换机和队列
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "orange");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "black");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "green");
// 6. 发送消息
String text1 = "hello routing, i am orange!";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "orange", null, text1.getBytes(StandardCharsets.UTF_8));
String text2 = "hello routing, i am black!";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "black", null, text2.getBytes(StandardCharsets.UTF_8));
String text3 = "hello routing, i am green!";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "green", null, text3.getBytes(StandardCharsets.UTF_8));
// 7. 释放资源
connection.close();
}
}
消费者代码:(有两份,除了绑定队列不同外,基本都是一样的,这里只放一份)
public class consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
// 2. 创建连接Connection和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明队列(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
// 4. 接收消息,进行消费💥
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE1, true, defaultConsumer);
}
}

Topics 和 Routing 模式的区别是:
topic;Routing 模式使用的交换机类型为 direct。topic 类型的交换机在匹配规则上进行了扩展,Binding Key 支持通配符匹配;direct 类型的交换机路由规则是 Binding Key 和 Routing Key 完全匹配。
匹配规则有如下要求:
Routing Key 是一系列由点 . 分隔的单词,比如 "stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit"Binding Key 和 Routing Key 一样,也是点 . 分割的字符串Binding Key 中可以存在两种特殊字符串,用于模糊匹配*:表示一个单词#:表示多个单词(0-N个)比如:
下面是需要用到的常量:
// 通配符模式
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";生产者代码如下所示:
public class producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
// 2. 创建连接Connection和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
// 4. 声明交换机
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);
// 5. 绑定交换机和队列
// 队列1绑定error信息
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.error");
// 队列2绑定error和info信息
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "#.info");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.error");
// 6. 发送消息
String text1 = "hello topic, i am order.error!";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "order.error", null, text1.getBytes(StandardCharsets.UTF_8));
String text2 = "hello routing, i am order.pay.info!";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "order.pay.info", null, text2.getBytes(StandardCharsets.UTF_8));
String text3 = "hello routing, i am pay.error!";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "pay.error", null, text3.getBytes(StandardCharsets.UTF_8));
// 7. 释放资源
connection.close();
}
}
消费者代码:(有两份,除了绑定队列不同外,基本都是一样的,这里只放一份)
public class consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
// 2. 创建连接Connection和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3. 声明队列(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
// 4. 接收消息,进行消费💥
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1, true, defaultConsumer);
}
}

RPC(Remote Procedure Call,远程过程调用)是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术,类似于Http远程调用。
RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程。

replyTo 以及 correlationId)correlationId 是否一致,将一致的消息放到阻塞队列中,以便同步获取
下面是需要用到的常量:
// rpc模式
public static String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static String RPC_RESPONSE_QUEUE = "rpc.response.queue";客户端代码如下所示:
public class client {
private final static BlockingQueue<String> bq = new ArrayBlockingQueue<>(1);
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1. 创建连接工厂、连接Connection和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 2. 声明队列
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
// 3. 发送请求到请求队列中,需要设置属性(使用内置交换机时, routingKey要和队列名称一样, 才可以路由到对应的队列上去)
String id = UUID.randomUUID().toString();
String text = "hello rpc!";
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.replyTo(Constants.RPC_RESPONSE_QUEUE) // 设置回调队列
.correlationId(id) // 唯一标志本次请求
.build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, text.getBytes(StandardCharsets.UTF_8));
// 4. 接收响应队列中的消息(需要放到阻塞队列中,保持接收时候的同步)
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String resp = new String(body);
System.out.println("接收到响应队列中消息:" + resp);
// 校验CorrelationId是否一致
if(id.equals(properties.getCorrelationId())) {
bq.offer(resp);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
// 获取回调的结果(因为是从阻塞队列中拿,所以这里会阻塞)
String result = bq.take();
System.out.println(" [RPCClient] Result:" + result);
// 释放资源
connection.close();
}
}注意事项💥
basicQos,RabbitMQ 会使用默认的 QoS 设置,其 prefetchCount 默认值为 0。当 prefetchCount 为 0 时,RabbitMQ 会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者。这意味着在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有所波动。// 设置同时最多只能获取一个消息
channel.basicQos(1);basicConsume 方法的 autoAck 参数用于指定消费者是否应该自动向消息队列确认消息:basicAck 方法来确认消息。手动确认提供了更高的可靠性,确保消息不会被意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景。public class server {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂、连接Connection和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 2. 接收请求
// 设置同时最多只能获取一个消息💥
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String text = new String(body);
System.out.println("接收到请求:" + text);
String resp = "[response] " + text; // 处理业务。这里简单拼接字符串即可
// 3. 发送响应
// 设置属性,将消息放到响应队列中
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", properties.getReplyTo(), props, resp.getBytes(StandardCharsets.UTF_8));
// 因为设置了autoAck=false,所以需要手动ack确定一下💥
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
}
}
作为消息中间件,都会面临消息丢失的问题。消息丢失大概分为三种情况:

RabbitMQ 也对上述问题给出了相应的解决方案。
前面一直使用的 basicPublish() 只是把消息写入到 TCP 缓冲区,并不代表消息真的到达了 RabbitMQ 服务器或被持久化。 在 Publisher Confirms 模式下,只有 waitForConfirms() 或 waitForConfirmsOrDie() 收到确认后,消息才算真正安全投递成功。
方法 | 是否阻塞 | 粒度 | 性能 | 异常处理 | 场景 |
|---|---|---|---|---|---|
basicPublish() | 否 | 不确认 | 高 | 无法检测失败 | 不关心可靠性时 |
waitForConfirms() | 是 | 单条 | 低 | 返回 false 或超时 | 高安全但低吞吐 |
waitForConfirmsOrDie() | 是 | 批量 | 中 | 抛异常 | 批量发送 |
addConfirmListener() | 否 | 异步 | 高 | 回调处理 | 高吞吐系统 |
发布确认是 AMQP 0.9.1 协议的扩展,默认情况下它不会被启用。生产者通过 channel.confirmSelect() 将信道设置为 confirm 模式。
整体代码框架:
// 常量类:发布确认机制
public static String PUBLISH_CONFIRM_QUEUE1 = "publish.confirm.queue1";
public static String PUBLISH_CONFIRM_QUEUE2 = "publish.confirm.queue2";
public static String PUBLISH_CONFIRM_QUEUE3 = "publish.confirm.queue3";
public class PublisherConfirms {
public static final Integer MESSAGE_SIZE = 10000; // 发送消息的数量
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// Strategy 1: Publishing Messages Individually
PublishingMessagesIndividually();
// Strategy 2: Publishing Messages in Batches
PublishingMessagesInBatchesy();
// Strategy 3: Handling Publisher Confirms Asynchronously
HandlingPublisherConfirmsAsynchronously();
}
// 获取rabbitmq连接
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.IP);
factory.setPort(Constants.PORT);
factory.setVirtualHost(Constants.VIRTUALHOST);
factory.setUsername(Constants.USERNAME);
factory.setPassword(Constants.PASSWORD);
Connection connection = factory.newConnection();
return connection;
}
// 单独确认
public static void PublishingMessagesIndividually() {}
// 批量确认
public static void PublishingMessagesInBatchesy() {}
// 异步确认
public static void HandlingPublisherConfirmsAsynchronously() {}
}这种策略是每发送一条消息后就调用 channel.waitForConfirmsOrDie() 方法,之后等待服务端的确认,这实际上是一种串行同步等待的方式。尤其对于持久化的消息来说,需要等待消息确认存储在磁盘之后才会返回(调用Linux内核的fsync方法)
// 单独确认
public static void PublishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = getConnection()) {
// 1. 获取通道
Channel channel = connection.createChannel();
// 2. 开启confirm模式
channel.confirmSelect();
// 3. 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE1, true, false, true, null);
// 4. 发送消息
long start = System.currentTimeMillis();
for(int i = 0; i < MESSAGE_SIZE; ++i) {
String text = "hello publisher_confirms";
channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE1, null, text.getBytes(StandardCharsets.UTF_8));
// waitForConfirmsOrDie() 阻塞当前线程,直到 RabbitMQ 返回所有已发送消息的确认。
// 如果超时过期, 则抛出TimeoutException。如果任何消息被nack(丢失), 则抛出IOException。
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("PublishingMessagesIndividually花费了:" + (end - start) + "ms");
}
}相比于单独确认策略,批量确认可以一次性发送多条消息,再批量进行消息确认,极大地提升效率!
缺点是出现 Basic.Nack 或者超时的情况,我们不清楚具体哪条消息出了问题,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量。当消息经常丢失时,批量确认的性能应该是不升反降的。
// 批量确认
public static void PublishingMessagesInBatchesy() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = getConnection()) {
// 1. 获取通道
Channel channel = connection.createChannel();
// 2. 开启confirm模式
channel.confirmSelect();
// 3. 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE2, true, false, true, null);
// 4. 发送消息
int batchSize = 200; // 一次确认的消息数量
int outstandingMessageCount = 0; // 记录当前已经
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_SIZE; ++i) {
// basicPublish() 只是把消息写入到 TCP 缓冲区,并不代表消息真的到达了 RabbitMQ 服务器或被持久化。
// 💥在 Publisher Confirms 模式下,只有 waitForConfirms() 或 waitForConfirmsOrDie() 收到确认后,消息才算真正安全投递成功。
String text = "hello publisher_confirms";
channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE2, null, text.getBytes(StandardCharsets.UTF_8));
outstandingMessageCount++;
// 批量确认消息
if (outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
}
// 消息发送完, 还有未确认的消息, 则进行确认
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.println("PublishingMessagesInBatchesy花费了:" + (end - start) + "ms");
}生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息 ack 会在将消息写入磁盘之后发出。
Broker 回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号,此外 Broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理。

Channel 接口提供了一个方法 addConfirmListener,这个方法可以添加 ConfirmListener 接口,这个接口中包含两个方法,分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack:
handleAck(long deliveryTag, boolean multiple)handleNack(long deliveryTag, boolean multiple)deliveryTag:表示发送消息的序号multiple:表示是否为批量确认此外,在编写代码的时候,需要为每一个 Channel 维护一个已发送消息的序号集合,当收到 RabbitMQ 的 confirm 回调时,从集合中删除对应确认的消息。当 Channel 开启 confirm 模式后,Channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号,我们可以使用 SortedSet 的有序性来维护这个已发消息的集合。
// 异步确认
public static void HandlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
try (Connection connection = getConnection()) {
// 1. 获取通道
Channel channel = connection.createChannel();
// 2. 开启confirm模式
channel.confirmSelect();
// 3. 声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE3, true, false, true, null);
// 创建一个有序集合SortedSet,存放delivery序号
SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<>());
// 4. 添加回调接口
channel.addConfirmListener(
(deliveryTag, multiple) -> {
if (multiple) {
// 批量确认:获取小于等于deliveryTag的序号集合,进行删除,表示这批序号的消息都已经被ack了
set.headSet(deliveryTag + 1).clear();
} else {
// 单条确认:将当前的deliveryTag从集合中移除
set.remove(deliveryTag);
}
},
(deliveryTag, multiple) -> {
if (multiple) {
// 批量确认:获取小于等于deliveryTag的序号集合,进行删除,表示这批序号的消息都已经被ack了
set.headSet(deliveryTag + 1).clear();
} else {
// 单条确认:将当前的deliveryTag从集合中移除
set.remove(deliveryTag);
}
// 如果处理失败, 这里需要添加处理消息重发的场景,此处代码省略
}
);
// 5. 发送消息
long start = System.currentTimeMillis();
for(int i = 0; i < MESSAGE_SIZE; ++i) {
String text = "hello publisher_confirms";
// 获取下一次发送的序号,必须在basicPublish之前调用,否则会出现错位!💥
long nextPublishSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, text.getBytes(StandardCharsets.UTF_8));
// 将序号存放到有序集合中
set.add(nextPublishSeqNo);
}
// 确认消息都确认完毕
while(!set.isEmpty()) {
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.println("PublishingMessagesInBatchesy花费了:" + (end - start) + "ms");
}
}运行结果如下所示:
PublishingMessagesIndividually花费了:2738ms
PublishingMessagesInBatchesy花费了:352ms
HandlingPublisherConfirmsAsynchronously花费了:192ms原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。