首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >【RabbitMQ】详细使用:工作队列 && 发布/订阅模式 && 路由模式 && 通配符模式 && RPC模式 && 发布确认机制

【RabbitMQ】详细使用:工作队列 && 发布/订阅模式 && 路由模式 && 通配符模式 && RPC模式 && 发布确认机制

原创
作者头像
lirendada
发布2026-01-12 15:43:40
发布2026-01-12 15:43:40
150
举报
文章被收录于专栏:中间件中间件

一、Work Queue(工作队列模式)

简单模式的增强版,和简单模式的区别就是:简单模式只有一个消费者,而工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被一个消费者接收。

首先先把之前代码简化一下,把常量抽出来:

代码语言:javascript
复制
public class Constants {
    public static final String IP = "127.0.0.1";
    public static final int PORT = 5672;
    public static final String VIRTUALHOST = "lirendada";
    public static final String USERNAME = "liren";
    public static final String PASSWORD = "123456";

    public static final String WORK_QUEUE = "work.queue";
}

为了能看到多个消费者竞争的关系,这里一次发送10条消息。生产者代码如下所示:

代码语言:javascript
复制
public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);

        // 2. 创建连接Connection和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明一个队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        // 4. 发送消息(当使用内置交换机的时候,routingKey必须和队列名称保持一致)
        for(int i = 0; i < 10; ++i) {
            String text = "hello workqueue " + i;
            channel.basicPublish("", Constants.WORK_QUEUE, null, text.getBytes(StandardCharsets.UTF_8));
        }

        // 5. 释放资源
        connection.close();
    }
}

消费者代码和简单模式一样,只是复制两份,两个消费者代码可以是一样的:

代码语言:javascript
复制
public class consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);

        // 2. 创建连接Connection和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明一个队列(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        // 4. 接收消息,进行消费💥
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException
            {
                System.out.println("接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, defaultConsumer);
    }
}

二、Publish/Subscribe(发布/订阅模式)

RabbitMQ 交换机常见三种类型:fanoutdirecttopic,不同类型有着不同的路由策略。

  1. Fanout广播策略,将消息交给所有绑定到该交换机的队列(Publish/Subscribe 模式
  2. Direct定向策略,把消息交给符合指定 routing key 的队列(Routing 模式
  3. Topic通配符策略,把消息交给符合 routing pattern 的队列(Topics 模式

所以发布订阅模式使用了交换机的 Fanout 广播模式来完成!此时需要知道创建交换机,以及绑定交换机和队列的方法,如下所示:

  1. 创建交换机:
    代码语言:javascript
    复制
    channel.exchangeDeclare(String exchange,
                            BuiltinExchangeType type,
                            boolean durable,
                            boolean autoDelete,
                            boolean internal,
                            Map<String, Object> arguments) throws IOException;

    参数

    说明

    典型取值/建议

    exchange

    交换机名称

    如 "my.direct.exchange"。同名交换机若存在,属性必须一致

    type

    交换机类型

    DIRECT, FANOUT, TOPIC, HEADERS

    durable

    是否持久化

    true 表示 RabbitMQ 重启后交换机仍保留

    autoDelete

    是否自动删除

    当没有队列绑定且没有连接使用时自动删除

    internal

    是否内部交换机

    若为 true,客户端不能直接发布消息到该交换机,通常用于交换机间路由

    arguments

    扩展参数

    例如备用交换机、延迟特性等

  2. 绑定交换机和队列:
    代码语言:javascript
    复制
    channel.queueBind(String queue, 
                      String exchange, 
                      String routingKey) throws IOException;

    参数名

    作用

    说明

    queue

    队列名称

    要绑定的队列名(必须已经声明过 queueDeclare())

    exchange

    交换机名称

    要绑定到的交换机(必须已声明过 exchangeDeclare())

    routingKey

    路由键

    决定消息如何路由到该队列

下面是需要用到的常量:

代码语言:javascript
复制
// 发布订阅模式
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";

因为广播模式就是要推送消息给所有绑定当前交换机的队列,所以绑定队列和交换机的时候,只需要设置 routing key 为空字符串即可

生产者代码:

代码语言:javascript
复制
public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);

        // 2. 创建连接Connection和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明两个队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);

        // 4. 创建交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);

        // 5. 绑定交换机和队列(因为是广播模式,所以本质不需要routingkey,置为空字符串即可)
        channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
        channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");

        // 6. 发送消息
        String text = "hello public/subscribe && fanout mode!";
        channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, text.getBytes(StandardCharsets.UTF_8));

        // 7. 释放资源
        connection.close();
    }
}

消费者代码:(有两份,都是一样的,这里只放一份)

代码语言:javascript
复制
public class consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);

        // 2. 创建连接Connection和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明队列(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);

        // 4. 接收消息,进行消费💥
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException
            {
                System.out.println("接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1, true, defaultConsumer);
    }
}

三、Routing(路由模式)

RabbitMQ 交换机常见三种类型:fanoutdirecttopic,不同类型有着不同的路由策略。

  1. Fanout广播策略,将消息交给所有绑定到该交换机的队列(Publish/Subscribe 模式
  2. Direct定向策略,把消息交给符合指定 routing key 的队列(Routing 模式
  3. Topic通配符策略,把消息交给符合 routing pattern 的队列(Topics 模式

路由模式采用的是 RabbitMQ 中的 Direct 定向策略,生产者发送消息的时候,交换机需要根据消息中的 Routing Key 将消息发送给指定的队列,而不是发给每一个队列了!

此时,队列和交换机的绑定,不能是任意的绑定了,而是要指定一个 Binding Key

只有队列绑定时的 Binding Key 和消息中的 Routing Key 完全一致,队列才会接收到消息

和发布订阅模式的区别是:交换机类型不同、绑定队列的 Binding Key 不同。

下面是需要用到的常量:

代码语言:javascript
复制
// 路由模式
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";

生产者代码:

代码语言:javascript
复制
public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);

        // 2. 创建连接Connection和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明两个队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);

        // 4. 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);

        // 5. 绑定交换机和队列
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "orange");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "black");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "green");

        // 6. 发送消息
        String text1 = "hello routing, i am orange!";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "orange", null, text1.getBytes(StandardCharsets.UTF_8));

        String text2 = "hello routing, i am black!";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "black", null, text2.getBytes(StandardCharsets.UTF_8));

        String text3 = "hello routing, i am green!";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "green", null, text3.getBytes(StandardCharsets.UTF_8));

        // 7. 释放资源
        connection.close();
    }
}

消费者代码:(有两份,除了绑定队列不同外,基本都是一样的,这里只放一份)

代码语言:javascript
复制
public class consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);

        // 2. 创建连接Connection和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明队列(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);

        // 4. 接收消息,进行消费💥
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException
            {
                System.out.println("接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE1, true, defaultConsumer);
    }
}

四、Topics(通配符模式)

Topics 和 Routing 模式的区别是:

  1. 交换机类型不同:Topics 模式使用的交换机类型为 topic;Routing 模式使用的交换机类型为 direct
  2. 匹配规则不同:topic 类型的交换机在匹配规则上进行了扩展,Binding Key 支持通配符匹配;direct 类型的交换机路由规则是 Binding KeyRouting Key 完全匹配。

匹配规则有如下要求:

  1. Routing Key 是一系列由点 . 分隔的单词,比如 "stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit"
  2. Binding KeyRouting Key 一样,也是点 . 分割的字符串
  3. Binding Key 中可以存在两种特殊字符串,用于模糊匹配
    1. *:表示一个单词
    2. #:表示多个单词(0-N个)

比如:

  • Binding Key 为 "d.a.b" 会同时路由到 Q1 和 Q2
  • Binding Key 为 "d.a.f" 会路由到 Q1
  • Binding Key 为 "c.e.f" 会路由到 Q2
  • Binding Key 为 "d.b.f" 会被丢弃,或者返回给生产者(需要设置 mandatory 参数)

下面是需要用到的常量:

代码语言:javascript
复制
// 通配符模式
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";

生产者代码如下所示:

代码语言:javascript
复制
public class producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);

        // 2. 创建连接Connection和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

        // 4. 声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);

        // 5. 绑定交换机和队列
        //  队列1绑定error信息
        channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.error");
        //  队列2绑定error和info信息
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "#.info");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.error");

        // 6. 发送消息
        String text1 = "hello topic, i am order.error!";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "order.error", null, text1.getBytes(StandardCharsets.UTF_8));

        String text2 = "hello routing, i am order.pay.info!";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "order.pay.info", null, text2.getBytes(StandardCharsets.UTF_8));

        String text3 = "hello routing, i am pay.error!";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "pay.error", null, text3.getBytes(StandardCharsets.UTF_8));

        // 7. 释放资源
        connection.close();
    }
}

消费者代码:(有两份,除了绑定队列不同外,基本都是一样的,这里只放一份)

代码语言:javascript
复制
public class consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);

        // 2. 创建连接Connection和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3. 声明队列(这是安全性措施,因为如果生产者还没创建队列的话,消费者这边直接读取会报错)
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);

        // 4. 接收消息,进行消费💥
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException
            {
                System.out.println("接收到消息: " + new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE1, true, defaultConsumer);
    }
}

五、RPC(RPC通信模式)

RPC(Remote Procedure Call,远程过程调用)是一种通过网络从远程计算机上请求服务,而不需要了解底层网络的技术,类似于Http远程调用。

RabbitMQ 实现 RPC 通信的过程,大概是通过两个队列实现一个可回调的过程。

编写客户端代码

  1. 发送请求到请求队列中(需要设置 replyTo 以及 correlationId
  2. 接收响应队列中的消息,判断 correlationId 是否一致,将一致的消息放到阻塞队列中,以便同步获取

    下面是需要用到的常量:

代码语言:javascript
复制
// rpc模式
public static String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static String RPC_RESPONSE_QUEUE = "rpc.response.queue";

客户端代码如下所示:

代码语言:javascript
复制
public class client {
    private final static BlockingQueue<String> bq = new ArrayBlockingQueue<>(1);

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1. 创建连接工厂、连接Connection和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 声明队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

        // 3. 发送请求到请求队列中,需要设置属性(使用内置交换机时, routingKey要和队列名称一样, 才可以路由到对应的队列上去)
        String id = UUID.randomUUID().toString();
        String text = "hello rpc!";
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .replyTo(Constants.RPC_RESPONSE_QUEUE) // 设置回调队列
                .correlationId(id)                     // 唯一标志本次请求 
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, text.getBytes(StandardCharsets.UTF_8));

        // 4. 接收响应队列中的消息(需要放到阻塞队列中,保持接收时候的同步)
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String resp = new String(body);
                System.out.println("接收到响应队列中消息:" + resp);

                // 校验CorrelationId是否一致
                if(id.equals(properties.getCorrelationId())) {
                    bq.offer(resp);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);

        // 获取回调的结果(因为是从阻塞队列中拿,所以这里会阻塞)
        String result = bq.take();
        System.out.println(" [RPCClient] Result:" + result);

        // 释放资源
        connection.close();
    }
}

编写服务端代码

  1. 接收请求队列中的消息
  2. 根据消息内容进行响应处理,把应答结果返回到回调队列中

注意事项💥

  1. 设置同时最多只能获取一个消息
    1. 如果不设置 basicQos,RabbitMQ 会使用默认的 QoS 设置,其 prefetchCount 默认值为 0。当 prefetchCount0 时,RabbitMQ 会根据内部实现和当前的网络状况等因素,可能会同时发送多条消息给消费者。这意味着在默认情况下,消费者可能会同时接收到多条消息,但具体数量不是严格保证的,可能会有所波动。
    2. 在 RPC 模式下,通常期望的是一对一的消息处理,即一个请求对应一个响应。消费者在处理完一个消息并确认之后,才会接收到下一条消息。
      代码语言:javascript
      复制
      // 设置同时最多只能获取一个消息
      channel.basicQos(1);
  2. RabbitMQ消息确定机制
    1. 在 RabbitMQ 中,basicConsume 方法的 autoAck 参数用于指定消费者是否应该自动向消息队列确认消息:
      • 自动确认(autoAck=true):消息队列在将消息发送给消费者后,会立即从内存中删除该消息。这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费。
      • 手动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调用 basicAck 方法来确认消息。手动确认提供了更高的可靠性,确保消息不会被意外丢失,适用于消息处理重要且需要确保每个消息都被正确处理的场景。
代码语言:javascript
复制
public class server {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建连接工厂、连接Connection和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 2. 接收请求
        // 设置同时最多只能获取一个消息💥
        channel.basicQos(1);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String text = new String(body);
                System.out.println("接收到请求:" + text);

                String resp = "[response] " + text; // 处理业务。这里简单拼接字符串即可

                // 3. 发送响应
                // 设置属性,将消息放到响应队列中
                AMQP.BasicProperties props = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", properties.getReplyTo(), props, resp.getBytes(StandardCharsets.UTF_8));

                // 因为设置了autoAck=false,所以需要手动ack确定一下💥
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}

六、Publisher Confirms(发布确认机制)

作为消息中间件,都会面临消息丢失的问题。消息丢失大概分为三种情况:

  1. 生产者问题。因为应用程序故障,网络抖动等各种原因,生产者没有成功向 Broker 发送消息。
  2. 消息中间件自身问题。生产者成功发送给了 Broker,但是 Broker 没有把消息保存好,导致消息丢失。
  3. 消费者问题。Broker 发送消息到消费者,消费者在消费消息时,因为没有处理好,导致 Broker 将消费失败的消息从队列中删除了。

RabbitMQ 也对上述问题给出了相应的解决方案。

  • 针对问题1,采用 发布确认机制 解决
  • 针对问题2,采用 持久化机制 解决
  • 针对问题3,采用 消息应答机制 解决

前面一直使用的 basicPublish() 只是把消息写入到 TCP 缓冲区,并不代表消息真的到达了 RabbitMQ 服务器或被持久化。 在 Publisher Confirms 模式下,只有 waitForConfirms()waitForConfirmsOrDie() 收到确认后,消息才算真正安全投递成功。

方法

是否阻塞

粒度

性能

异常处理

场景

basicPublish()

不确认

无法检测失败

不关心可靠性时

waitForConfirms()

单条

返回 false 或超时

高安全但低吞吐

waitForConfirmsOrDie()

批量

抛异常

批量发送

addConfirmListener()

异步

回调处理

高吞吐系统

发布确认是 AMQP 0.9.1 协议的扩展,默认情况下它不会被启用。生产者通过 channel.confirmSelect() 将信道设置为 confirm 模式。

整体代码框架:

代码语言:javascript
复制
// 常量类:发布确认机制
public static String PUBLISH_CONFIRM_QUEUE1 = "publish.confirm.queue1";
public static String PUBLISH_CONFIRM_QUEUE2 = "publish.confirm.queue2";
public static String PUBLISH_CONFIRM_QUEUE3 = "publish.confirm.queue3";

public class PublisherConfirms {
    public static final Integer MESSAGE_SIZE = 10000; // 发送消息的数量

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // Strategy 1: Publishing Messages Individually
        PublishingMessagesIndividually();

        // Strategy 2: Publishing Messages in Batches
        PublishingMessagesInBatchesy();

        // Strategy 3: Handling Publisher Confirms Asynchronously
        HandlingPublisherConfirmsAsynchronously();
    }

    // 获取rabbitmq连接
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.IP);
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUALHOST);
        factory.setUsername(Constants.USERNAME);
        factory.setPassword(Constants.PASSWORD);
        Connection connection = factory.newConnection();
        return connection;
    }

    // 单独确认
    public static void PublishingMessagesIndividually() {}

    // 批量确认
    public static void PublishingMessagesInBatchesy() {}

    // 异步确认
    public static void HandlingPublisherConfirmsAsynchronously() {}
}

① Publishing Messages Individually(单条确认)

这种策略是每发送一条消息后就调用 channel.waitForConfirmsOrDie() 方法,之后等待服务端的确认,这实际上是一种串行同步等待的方式。尤其对于持久化的消息来说,需要等待消息确认存储在磁盘之后才会返回(调用Linux内核的fsync方法)

代码语言:javascript
复制
// 单独确认
public static void PublishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {
    try (Connection connection = getConnection()) {
        // 1. 获取通道
        Channel channel = connection.createChannel();

        // 2. 开启confirm模式
        channel.confirmSelect();

        // 3. 声明队列
        channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE1, true, false, true, null);

        // 4. 发送消息
        long start = System.currentTimeMillis();
        for(int i = 0; i < MESSAGE_SIZE; ++i) {
            String text = "hello publisher_confirms";
            channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE1, null, text.getBytes(StandardCharsets.UTF_8));

            // waitForConfirmsOrDie() 阻塞当前线程,直到 RabbitMQ 返回所有已发送消息的确认。
            // 如果超时过期, 则抛出TimeoutException。如果任何消息被nack(丢失), 则抛出IOException。
            channel.waitForConfirmsOrDie(5000);
        }
        long end = System.currentTimeMillis();
        System.out.println("PublishingMessagesIndividually花费了:" + (end - start) + "ms");
    }
}

② Publishing Messages in Batches(批量确认)

相比于单独确认策略,批量确认可以一次性发送多条消息,再批量进行消息确认,极大地提升效率!

缺点是出现 Basic.Nack 或者超时的情况,我们不清楚具体哪条消息出了问题,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量。当消息经常丢失时,批量确认的性能应该是不升反降的

代码语言:javascript
复制
// 批量确认
public static void PublishingMessagesInBatchesy() throws IOException, TimeoutException, InterruptedException {
    try (Connection connection = getConnection()) {
        // 1. 获取通道
        Channel channel = connection.createChannel();

        // 2. 开启confirm模式
        channel.confirmSelect();

        // 3. 声明队列
        channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE2, true, false, true, null);

        // 4. 发送消息
        int batchSize = 200;             // 一次确认的消息数量
        int outstandingMessageCount = 0; // 记录当前已经

        long start = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_SIZE; ++i) {
            // basicPublish() 只是把消息写入到 TCP 缓冲区,并不代表消息真的到达了 RabbitMQ 服务器或被持久化。
            // 💥在 Publisher Confirms 模式下,只有 waitForConfirms() 或 waitForConfirmsOrDie() 收到确认后,消息才算真正安全投递成功。
            String text = "hello publisher_confirms";
            channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE2, null, text.getBytes(StandardCharsets.UTF_8));
            outstandingMessageCount++;

            // 批量确认消息
            if (outstandingMessageCount == batchSize) {
                channel.waitForConfirmsOrDie(5000);
                outstandingMessageCount = 0;
            }
        }
        // 消息发送完, 还有未确认的消息, 则进行确认
        if (outstandingMessageCount > 0) {
            channel.waitForConfirmsOrDie(5000);
        }
        long end = System.currentTimeMillis();
        System.out.println("PublishingMessagesInBatchesy花费了:" + (end - start) + "ms");
    }

③ Handling Publisher Confirms Asynchronously(异步确认)

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息 ack 会在将消息写入磁盘之后发出。

Broker 回传给生产者的确认消息中 deliveryTag 包含了确认消息的序号,此外 Broker 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理。

Channel 接口提供了一个方法 addConfirmListener,这个方法可以添加 ConfirmListener 接口,这个接口中包含两个方法,分别对应处理 RabbitMQ 发送给生产者的 ack 和 nack:

  • handleAck(long deliveryTag, boolean multiple)
  • handleNack(long deliveryTag, boolean multiple)
    • deliveryTag:表示发送消息的序号
    • multiple:表示是否为批量确认

此外,在编写代码的时候,需要为每一个 Channel 维护一个已发送消息的序号集合,当收到 RabbitMQ 的 confirm 回调时,从集合中删除对应确认的消息。当 Channel 开启 confirm 模式后,Channel 上发送消息都会附带一个从 1 开始递增的 deliveryTag 序号,我们可以使用 SortedSet 的有序性来维护这个已发消息的集合

  1. 当收到 ack 时,从序列中删除该消息的序号。如果为批量确认消息,表示小于等于当前序号 deliveryTag 的消息都收到了,则清除对应集合。
  2. 当收到 nack 时,处理逻辑类似,不过需要结合具体的业务情况,进行消息重发等操作。
代码语言:javascript
复制
// 异步确认
public static void HandlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {
    try (Connection connection = getConnection()) {
        // 1. 获取通道
        Channel channel = connection.createChannel();

        // 2. 开启confirm模式
        channel.confirmSelect();

        // 3. 声明队列
        channel.queueDeclare(Constants.PUBLISH_CONFIRM_QUEUE3, true, false, true, null);

        // 创建一个有序集合SortedSet,存放delivery序号
        SortedSet<Long> set = Collections.synchronizedSortedSet(new TreeSet<>());

        // 4. 添加回调接口
        channel.addConfirmListener(
            (deliveryTag, multiple) -> {
                if (multiple) {
                    // 批量确认:获取小于等于deliveryTag的序号集合,进行删除,表示这批序号的消息都已经被ack了
                    set.headSet(deliveryTag + 1).clear();
                } else {
                    // 单条确认:将当前的deliveryTag从集合中移除
                    set.remove(deliveryTag);
                }
            },
            (deliveryTag, multiple) -> {
                if (multiple) {
                    // 批量确认:获取小于等于deliveryTag的序号集合,进行删除,表示这批序号的消息都已经被ack了
                    set.headSet(deliveryTag + 1).clear();
                } else {
                    // 单条确认:将当前的deliveryTag从集合中移除
                    set.remove(deliveryTag);
                }
                // 如果处理失败, 这里需要添加处理消息重发的场景,此处代码省略
            }
        );

        // 5. 发送消息
        long start = System.currentTimeMillis();
        for(int i = 0; i < MESSAGE_SIZE; ++i) {
            String text = "hello publisher_confirms";

            // 获取下一次发送的序号,必须在basicPublish之前调用,否则会出现错位!💥
            long nextPublishSeqNo = channel.getNextPublishSeqNo();

            channel.basicPublish("", Constants.PUBLISH_CONFIRM_QUEUE3, null, text.getBytes(StandardCharsets.UTF_8));

            // 将序号存放到有序集合中
            set.add(nextPublishSeqNo);
        }

        // 确认消息都确认完毕
        while(!set.isEmpty()) {
            Thread.sleep(10);
        }
        long end = System.currentTimeMillis();
        System.out.println("PublishingMessagesInBatchesy花费了:" + (end - start) + "ms");
    }
}

运行结果如下所示:

代码语言:javascript
复制
PublishingMessagesIndividually花费了:2738ms
PublishingMessagesInBatchesy花费了:352ms
HandlingPublisherConfirmsAsynchronously花费了:192ms

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Work Queue(工作队列模式)
  • 二、Publish/Subscribe(发布/订阅模式)
  • 三、Routing(路由模式)
  • 四、Topics(通配符模式)
  • 五、RPC(RPC通信模式)
    • 编写客户端代码
    • 编写服务端代码
  • 六、Publisher Confirms(发布确认机制)
    • ① Publishing Messages Individually(单条确认)
    • ② Publishing Messages in Batches(批量确认)
    • ③ Handling Publisher Confirms Asynchronously(异步确认)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档