前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ系列(五)Topic模型

RabbitMQ系列(五)Topic模型

作者头像
Jensen_97
发布2023-07-20 14:53:17
1640
发布2023-07-20 14:53:17
举报
文章被收录于专栏:技术客栈

Topic 模型

上一篇文章《RabbitMQ系列(四)通信模型之路由模型》中,简单的介绍了一下RabbitMQ的路由模型。这篇文章来学习一下RabbitMQ中的topic模型,Topic 模型是 RabbitMQ 的高级模型之一,Topic 模型使用了通配符的概念,可以匹配更灵活的路由规则。topic模式相当于是对路由模式的一个升级,topic模式主要就是在匹配的规则上可以实现模糊匹配。

在 Topic 模型中,生产者将消息发送到交换机,交换机根据消息的 routing key 将消息转发到对应的队列中。与 Direct 模型不同的是,Topic 模型中 routing key 支持通配符匹配,其中 '*' 可以匹配一个单词,'#' 可以匹配多个单词。例如,"order.*" 可以匹配 "order.create","order.delete" 等消息,而 "order.#" 可以匹配 "order.create.one","order.delete.two" 等消息。

适用场景

Topic 模型适用于需要灵活的消息路由规则的场景,例如:

  1. 新闻网站订阅分类消息;
  2. 电商网站订阅商品分类消息;
  3. 金融机构订阅股票市场消息等。

演示

  1. 生产者
代码语言:javascript
复制
// 生产者
public class Producer {
    private static final String EXCHANGE_NAME = "exchange_topic_1";
    private static final String EXCHANGE_ROUTING_KEY1 = "topic.km";
    private static final String EXCHANGE_ROUTING_KEY2 = "topic.km.001";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        for (int i = 0; i < 100; i++) {
            // topic在路由模型的基础上,只有路由的key发生改变,其余的都不变
            if (i % 2 == 0) {
                channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型发送的第 " + i + " 条信息").getBytes());
            } else {
                channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型发送的第 " + i + " 条信息").getBytes());
            }
        }
        channel.close();
        connection.close();
    }
}
  1. 消费者
代码语言:javascript
复制
// 消费者1
public class Consumer1 {
    private static final String QUEUE_NAME = "queue_topic_1";
    private static final String EXCHANGE_NAME = "exchange_topic_1";
    private static final String EXCHANGE_ROUTING_KEY = "topic.*";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
代码语言:javascript
复制
// 消费者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_topic_2";
    private static final String EXCHANGE_NAME = "exchange_topic_1";
    private static final String EXCHANGE_ROUTING_KEY = "topic.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
  1. 测试

先启动2个消费者,再启动生产者

消费者1订阅的是 "order.*" 的消息,消费者2订阅的是 "order.#" 的消息,可以得到以下结果:

消费者1接收到的消息是:"Topic 模型发送的偶数条消息"

消费者2接收到的消息是:"Topic 模型发送的全部消息"

小结

本文介绍了 RabbitMQ 通信模型中的 Topic 模型的使用,通过交换机和 routing key 实现更灵活的消息路由。在实际使用过程中,需要注意以下几点:

  1. 路由键的格式应该是多个单词组成,用 '.' 分隔;
  2. '#' 匹配多个单词,'*' 匹配一个单词;
  3. 一个队列可以绑定多个 routing key;
  4. 如果交换机没有匹配到任何一个队列,则会抛弃该消息。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Topic 模型
  • 适用场景
  • 演示
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档