前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RabbitMQ面试必备知识点及实战 - Exchange交换机类型详解

RabbitMQ面试必备知识点及实战 - Exchange交换机类型详解

作者头像
JavaEdge
修改于 2024-09-10 07:37:05
修改于 2024-09-10 07:37:05
9410
举报

0 前言

Exchange:接收消息,并根据路由键转发消息所绑定的队列。交换机并非一个单独进程,而是一个有着“地址”的列表而已。

蓝区 - Send Message:把消息投递到交换机,由 RoutingKey 路由到指定队列。

交换机属性

声明交换机时可附带许多属性:

  • Name 交换机名称
  • Type 交换机类型,direct、topic、 fanout、 headers
  • Durability,是否需要持久化。 如果持久化,则RabbitMQ重启后,交换机还存在
  • Auto-delete 当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
  • Internal 当前Exchange是否于RabbitMQ内部使用,默认为False

交换机类型

  1. Direct exchange(直连交换机)
  2. Fanout exchange(扇型交换机)
  3. Topic exchange(主题交换机)
  4. Headers exchange(头交换机)
  5. Dead Letter Exchange(死信交换机)

1 默认交换机

amq.* exchanges

1、一个队列对应了多个消费者,

2、默认,由队列对消息进行平均分配,消息会被分到不同的消费者手中。3、消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动

2 Direct Exchange

所有发送到DE的消息被转发到RouteKey中指定的Queue。

Direct模式可用RabbitMQ自带的Exchange:default Exchange,所以无需将Exchange进行任何绑定(binding),消息传递时,RouteKey须完全匹配才会被队列接收,否则该消息被丢弃。

Direct Exchange原理示意图
实战
代码语言:java
AI代码解释
复制
/**
 * 直连模式-生产者
 *
 * @author JavaEdge
 */
public class ProducerDirectExchange {
    public static void main(String[] args) throws Exception {
       //1 创建ConnectionFactory
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("localhost");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       
       //2 创建Connection
       Connection connection = connectionFactory.newConnection();

       //3 创建Channel
       Channel channel = connection.createChannel();

       //4 声明
       String exchangeName = "test_direct_exchange";
       // !!!!!!!!!!!!!!!!!!!!!!!!
       String routingKey = "test.direct";

       //5 发送
       String msg = "Hello JavaEdge RabbitMQ Direct Exchange Message ... ";
       channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}
代码语言:java
AI代码解释
复制
/**
 * 直连模式-消费者
 *
 * @author JavaEdge
 */
public class ConsumerDirectExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       // 自动重连(3s)
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        // name和Pro中的一致
       String exchangeName = "test_direct_exchange";
       String exchangeType = "direct";
       String queueName = "test_direct_queue";
       String routingKey = "test.direct";

       channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
       channel.queueDeclare(queueName, false, false, false, null);
       //建立绑定关系
       channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        while(true){  
            //获取消息,如果没有消息,该步将会阻塞
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("Get Message:" + msg);
        } 
    }
}

路由key保持一致,分别启动:

看交换机:

看绑定关系:

看队列名:

看队列数据源的交换机:

3 Topic exchange

直接交换的局限性:不能做基于多个标准的路由。

如日志系统,可能不仅要根据严重性订阅日志,还要根据日志源订阅日志。

syslog unix工具根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。

这更具灵活性 - 可能想监听来自 cron 的关键错误及来自 kern 的所有日志。为了在日志记录系统实现这点,还需了解主题交换机。

  • *可匹配一个单词
  • #可匹配零或多个单词
  • 所有发送到Topic Exchange的消息会被转发到所有关心RouteKey中指定Topic的Queue
  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需绑定一个Topic

案例

将发送所有描述动物的消息。消息将与包含三个单词(两个点)的routing key一起发送。

代码语言:bash
AI代码解释
复制
routing key中的第一个单词描述速度,第二颜色,第三是物种:“<speed><color><species>”。

创建三个绑定:

代码语言:bash
AI代码解释
复制
Q1绑定了绑定键“* .orange.*”,Q2绑定了“*.*.rabbit”和“lazy.#”

这些绑定可总结为:

  • Q1对所有橙色动物感兴趣
  • Q2希望听到关于兔子的一切,以及关于懒惰动物的一切

routing key设置为“quick.orange.rabbit”的消息将传递到两个队列。消息“lazy.orange.elephant”也将同时发送给他们。另一方面

  • “quick.orange.fox”只会转到第一个队列
  • 而“lazy.brown.fox”只会转到第二个队列
  • “lazy.pink.rabbit”将仅传递到第二个队列一次,即使它匹配两个绑定
  • “quick.brown.fox”与任何绑定都不匹配,因此它将被丢弃。

如果我们违背我们的约定并发送带有一个或四个单词的消息,例如“orange” or “quick.orange.male.rabbit”,会发生什么?好吧,这些消息将不会匹配任何绑定,因此将丢失.

另一方面,“lazy.orange.male.rabbit”,虽然它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

实例图

实战

代码语言:java
AI代码解释
复制
/**
 * @author JavaEdge
 */
public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.javaedge";

        String msg = "Hello JavaEdge RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        channel.close();
        connection.close();
    }
}
代码语言:java
AI代码解释
复制
/**
 * 主题交换机-消费端
 *
 * @author JavaEdge
 */
public class Consumer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
//     String routingKey = "user.#";
        String routingKey = "user.*";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while (true) {
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("Get Message:" + msg);
        }
    }
}

启动消费者:

启动生产者:

消费端收到了消息。

修改匹配格式,理论上只能接受前两个消息:

管控台,先将之前的匹配绑定取消:

显然仅能接受前两个消息:

小结

当队列绑定“#”(哈希)绑定key时,它将接收所有消息,而不管routing key,就像fanout交换机。

当特殊字符“*”(星号)和“#”(哈希)未在绑定中使用时,主题交换机的行为就像直接交换机。

4 Fanout Exchange

不处理路由键,只需简单的将队列绑定到交换机。发送到交换机的消息都会被转发到与该交换机绑定的所有队列。Fanout交换机转发消息是最快的:

实战

代码语言:java
AI代码解释
复制
/**
 * @author JavaEdge
 */
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
       String exchangeName = "test_fanout_exchange";
       String exchangeType = "fanout";
       String queueName = "test_fanout_queue";
        // 不设置路由键
       String routingKey = "";
       channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
       channel.queueDeclare(queueName, false, false, false, null);
       channel.queueBind(queueName, exchangeName, routingKey);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
        while(true){
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());    
            System.out.println("Get Message:" + msg);
        } 
    }
}
代码语言:java
AI代码解释
复制
/**
 * @author JavaEdge
 */
public class Producer4FanoutExchange {
    public static void main(String[] args) throws Exception {
       ConnectionFactory connectionFactory = new ConnectionFactory();
       connectionFactory.setHost("localhost");
       connectionFactory.setPort(5672);
       connectionFactory.setVirtualHost("/");
       Connection connection = connectionFactory.newConnection();
       Channel channel = connection.createChannel();
       String exchangeName = "test_fanout_exchange";
       for(int i = 0; i < 4; i ++) {
          String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
          channel.basicPublish(exchangeName, "", null , msg.getBytes());           
       }
       channel.close();  
        connection.close();  
    }
}

启动消费端

无需routing key

启动生产者后接收到的消息:

5 Header Exchange

根据消息头信息(headers)来路由消息,而非路由键(routing key)。要在消息头设置一些KV对,交换机会根据这些键值对来决定将消息路由到哪个队列。

代码语言:java
AI代码解释
复制
/**
 * @author JavaEdge
 */
public class Producer4HeadersExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_headers_exchange";

        for (int i = 0; i < 4; i++) {
            String msg = "Hello World RabbitMQ 4 HEADERS Exchange Message ...";

            // 设置消息的头部信息
            Map<String, Object> headers = new HashMap<>();
          	// 指定匹配规则
            headers.put("x-match", "any"); // any 表示只要有一个头部信息匹配即可,all 表示所有头部信息都要匹配。
            headers.put("name", "JavaEdge");
            headers.put("age", "30");
						 // 将头部信息作为 BasicProperties 的一部分传递
            channel.basicPublish(exchangeName, "", new com.rabbitmq.client.AMQP.BasicProperties.Builder()
                    .headers(headers)
                    .build(), msg.getBytes());
        }

        channel.close();
        connection.close();
    }
}
代码语言:java
AI代码解释
复制
/**
 * @author JavaEdge
 */
public class Consumer4HeadersExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_headers_exchange";
        String exchangeType = "headers";
        String queueName = "test_headers_queue";

        // 声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);

        // 绑定队列到交换机,并设置头部匹配规则
        Map<String, Object> headers = new HashMap<>();
        headers.put("x-match", "any");
        headers.put("name", "JavaEdge");
        headers.put("age", "30");

        channel.queueBind(queueName, exchangeName, "", headers);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("Get Message:" + msg);
        }
    }
}

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。 各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。 负责: 中央/分销预订系统性能优化 活动&券等营销中台建设 交易平台及数据中台等架构和开发设计 车联网核心平台-物联网连接平台、大数据平台架构设计及优化 LLM Agent应用开发 区块链应用开发 大数据开发挖掘经验 推荐系统项目 目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/11/13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
5. Rest 风格
传统方式一般是一个请求 url 对应一种操作,这样做不仅麻烦,也不安全,因为会程序的人读取了你的请求 url 地址,就大概知道该 url 实现的是一个什么样的操作。
捞月亮的小北
2023/12/01
3060
5. Rest 风格
RESTful风格
传统方式一般是一个请求url对应一种操作,这样做不仅麻烦,也不安全,因为会程序的人读取了你的请求url地址,就大概知道该url实现的是一个什么样的操作。
ma布
2024/11/21
1760
RESTful风格
SpringBoot 必知必会的19个常用注解
该注解是@Controller和@ResponseBody的结合体,将响应数据直接塞到响应体里面
AI码师
2022/09/19
2350
快速学习-SpringMVC常用注解
属性: value:请求参数中的名称。 required:请求参数中是否必须提供此参数。默认值:true。表示必须提供,如果不提供将报错。
cwl_java
2020/04/08
5770
Spring MVC框架学习(五) ---- 传递参数
   通过@ResponseBody注解的方式实现json格式传到页面的方法。首先查看源代码如下图,springmvc的默认编码是“ISO-8859-1”;
RAIN7
2022/08/23
1.5K0
Spring MVC框架学习(五) ---- 传递参数
什么是RESTful?相关的注解有哪些?
哈喽,大家好呀!这里是码农后端。RESTful在我们开发过程中可以说是再常见不过的了,但可能有一些新手小伙伴在刚开始学的时候会有一些疑惑,因为里面涉及到了较多的与操作相关的注解,因此,这里就来简单地总结一下,希望能有所帮助。
reload
2024/04/17
2800
什么是RESTful?相关的注解有哪些?
一文读懂SpringMVC中的数据绑定
Struts2 和 SpringMVC 都是 Web 开发中视图层的框架,两者都实现了数据的自动绑定,都不需要我们手动获取参数然后关联到对应的属性上,下面就谈谈两者的区别。
Wizey
2018/09/29
9700
从Feign使用注意点到RESUFUL接口设计规范
最近项目中大量使用了Spring Cloud Feign来对接http接口,踩了不少坑,也产生了一些对RESTFUL接口设计的想法,特此一篇记录下。 SpringMVC的请求参数绑定机制 了解Feign历史的朋友会知道,Feign本身是Netflix的产品,Spring Cloud Feign是在原生Feign的基础上进行了封装,引入了大量的SpringMVC注解支持,这一方面使得其更容易被广大的Spring使用者开箱即用,但也产生了不小的混淆作用。所以在使用Spring Cloud Feign之前,笔者先
kirito-moe
2018/04/27
2.7K0
从Feign使用注意点到RESUFUL接口设计规范
SpringBoot常用注解集合「建议收藏」
这里我们不会将springboot全部的注解都一个一个分析一遍,因为现在普遍都是前后端分离开发,所以之前用在很多的模板视图解析上的注解现在已经不怎么用到了这里就没再提。有需要的同学可以去看我的其他关于框架的专栏。
全栈程序员站长
2022/09/24
5430
SpringBoot常用注解集合「建议收藏」
SpringMVC之常用注解
表现层(Representation):把资源具体呈现出来的形式,叫做它的表现层 (Representation)。
yuanshuai
2022/08/22
4700
SpringMVC之常用注解
手把手讲解Spring中的Http请求神器RestTemplate
Java 中关于 Http 请求的工具实际上非常多,自带的 HttpUrlConnection,古老的 HttpClient,后起之秀 OkHttp 等,除了这些之外,还有一个好用的工具--RestTemplate,这是 Spring 中就开始提供的 Http 请求工具,不过很多小伙伴们可能是因为 Spring Cloud 才听说它。今天我们就来聊一聊这个 RestTemplate。
江南一点雨
2020/02/21
2.8K0
手把手讲解Spring中的Http请求神器RestTemplate
SpringBoot 学习总结
3. 不在重定向的url中传参, 给重定向传参是用 RedirectAttributes 作为 Controller 参数, 它有一个 addFlashAttribute 的方法, 使用这个方法传递参数
北漂的我
2019/05/28
9120
一篇文章带你掌握主流服务层框架——SpringMVC
在之前的文章中我们已经学习了Spring的基本内容,SpringMVC隶属于Spring的一部分内容
秋落雨微凉
2022/10/25
1.8K0
一篇文章带你掌握主流服务层框架——SpringMVC
RESTful风格的应用
@RestController:简化开发过程。不需要在方法上额外添加@ResponseBody
Breeze.
2022/09/23
3210
SpringMVC
今天开始学习我自己总结的 Java-学习路线 中的《SpringMVC》,小简从 0 开始学 Java 知识,并不定期更新所学笔记,期待一年后的蜕变吧!
小简
2023/01/04
3.4K0
SpringMVC
JAVAEE框架技术之5-springMVC参数绑定和异步交互
在之前我们讲的请求响应都是同步的,但是在实际开发中我们都是使用异步请求,所以下面我们使用ajax发送异步请求!
张哥编程
2024/12/13
1190
JAVAEE框架技术之5-springMVC参数绑定和异步交互
SpringMVC框架之第四篇
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
海仔
2019/10/22
7160
SpringMVC入门
初始化SpringMVC环境(同Spring环境),设定SpringMVC加载对应的bean
Cikian.
2023/08/09
2810
Spring 框架基础(06):Mvc架构模式简介,执行流程详解
MVC是一种软件设计典范,用一种业务逻辑、数据、界面显示分离的方法组织代码,将业务逻辑聚集到一个组件里面,在改进和个性化定制界面及用户交互的同时,不需要重新编写业务逻辑,MVC分层有助于管理和架构复杂的应用程序
知了一笑
2019/12/10
1.4K0
Spring 框架基础(06):Mvc架构模式简介,执行流程详解
Spring Web MVC 基础
在一个项目中,如果业务流程比较简单的时候,可以把控制器的功能交给视图,项目架构中只有视图和模型,没有控制器。
xiaozhangStu
2023/05/04
6120
相关推荐
5. Rest 风格
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档