前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >04-RabbitMQ常用的六种模型以及在SpringBoot中的应用

04-RabbitMQ常用的六种模型以及在SpringBoot中的应用

作者头像
qubianzhong
发布2019-08-14 14:21:37
1K0
发布2019-08-14 14:21:37
举报
文章被收录于专栏:行者常至

在RabbitMQ中,我们常用的模型主要有六种,分别是:

  • Hello World
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • RPC

俗话说得好,光说不练假把式,下面我们结合springBoot逐一实现这六种模型。

Hello World

从上图可以看出,这是一个默认交换机的单播路由,并且每个队列只有一个消费者。

Work queues

从上图可以看出,主要的部分是:默认交换机的单播路由,并且每个队列有多个消费者。

Publish/Subscribe

从上图可以看出,主要的部分是:扇形交换机的多播路由。

Routing

从上图可以看出,主要的部分是:直连交换机的多播路由。

Topic

从上图可以看出,主要的部分是:主题交换机的多播路由。

RPC

从上图可以看出,主要的部分是:默认交换机的单播路由。

环境

  • 下面我们代码演示一下除了RPC之外的其他五种模型,在SpringBoot中的用法

pom.xml

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.qbz</groupId>
    <artifactId>rabbit-mq-test</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbit-mq-test</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml

代码语言:javascript
复制
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /qbz-test #虚拟主机,必须存在,不然会报错。
    username: guest
    password: guest
    publisher-confirms: true #消息发送到交换机确认机制,是否确认回调,默认false
    publisher-returns: true #消息发送到交换机确认机制,是否返回回调,默认false
    listener:
      simple:
        prefetch: 1 #预先载入数量,默认值250
        concurrency: 10 #指定最小消费数量
        max-concurrency: 20 #指定最大的消费者数量,当并行消费者数量到达concurrency后会开至最大max-concurrency
        acknowledge-mode: manual # 采用手动应答,设置为手动应答后,消费者如果不进行手动应答,会处于假死状态,不能再消费。默认auto
        retry:
          enabled: true # 失败重试机制,默认为false.
          max-attempts: 1 # 失败后,再重试几次,默认为:3

RabbitMqTestApplication.java

代码语言:javascript
复制
package cn.qbz.rabbitmqtest;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * RabbitMQ 测试
 *
 * @author qubianzhong
 * @Date 13:42 2019/7/26
 */
@SpringBootApplication
@EnableRabbit //新增开启Rabbit注解
public class RabbitMqTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqTestApplication.class, args);
    }

}

RabbitMqProduceController.java

代码语言:javascript
复制
package cn.qbz.rabbitmqtest.controller;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author qubianzhong
 * @date 2019/7/22 13:51
 */
@RestController
public class RabbitMqProduceController {

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private AmqpAdmin amqpAdmin;

    @GetMapping(value = "/produce")
    public String produceMsg(@RequestParam(value = "msg") String msg) {
        /****************************hello/work start****************************
         * hello-world、work queues 均是只绑定队列
         * 发布消息时,如下所示,推荐使用第二种
         */
        //1.如果队列不存在,则消息会进入“黑洞”
        amqpTemplate.convertAndSend("hello", msg + System.currentTimeMillis());
        /**
         * 2.如果队列不存在,则进行创建,如果队列已存在,则使用此队列
         * public Queue(String name) {
         *      //The queue is durable, non-exclusive and non auto-delete.
         * 		this(name, true, false, false);
         *  }
         */
        Queue queue = new Queue("hello");
        amqpAdmin.declareQueue(queue);
        amqpTemplate.convertAndSend(queue.getName(), msg + System.currentTimeMillis());

        //3.work
        queue = new Queue("work");
        amqpAdmin.declareQueue(queue);
        for (int i = 0; i < 30; i++) {
            amqpTemplate.convertAndSend(queue.getName(), "wwoork" + i);
        }
        /****************************hello/work end****************************/

        /****************************Publish/Subscribe start****************************
         * 生产者只向扇形交换机发送消息,扇形交换机负责向绑定其队列上的所有消费者进行分发。
         * public AbstractExchange(String name) {
         *      //Construct a new durable, non-auto-delete Exchange with the provided name.
         * 		this(name, true, false);
         * }
         */
        for (int i = 0; i < 5; i++) {
            Exchange pubSubExchange = new FanoutExchange("pub-sub-exchange");
            amqpAdmin.declareExchange(pubSubExchange);
            amqpTemplate.convertAndSend(pubSubExchange.getName(), null, msg + System.currentTimeMillis());
        }
        /****************************Publish/Subscribe end****************************/

        /****************************Routing start****************************
         * Routing  消费者消费的时候,多个路由键绑定一个队列
         */
        Exchange routeExchange = new DirectExchange("routing-exchange");
        amqpAdmin.declareExchange(routeExchange);
        amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-error", "routing-log-error:" + System.currentTimeMillis());
        amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-info", "routing-log-info:" + System.currentTimeMillis());
        amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-waring", "routing-log-waring:" + System.currentTimeMillis());

        /****************************Routing end****************************/

        /****************************Topic start****************************
         * Topic  消费者消费的时候,多个路由键,模糊匹配,绑定一个队列,其实和routing差不多
         */
        Exchange topicExchange = new TopicExchange("topic-exchange");
        amqpAdmin.declareExchange(topicExchange);
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-error.20190706", "topic-log-error.20190706:" + System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-info.20190606", "topic-log-info.20190606:" + System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-error.20190708", "topic-log-error.20190708:" + System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-waring.20190506", "topic-log-waring.20190506:" + System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-waring.20190516", "topic-log-waring.20190516:" + System.currentTimeMillis());
        amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-info.20190723", "topic-log-info.20190723:" + System.currentTimeMillis());

        /****************************Topic end****************************/

        return "success";
    }
}

RabbitTestListener.java

代码语言:javascript
复制
package cn.qbz.rabbitmqtest.rabbit;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author qubianzhong
 * @date 2019/7/26 14:40
 */
@Component
public class RabbitTestListener {
    /****************************demo 演示如何消费消息 start****************************
     * 推荐使用第二种或第三种
     * 当不需要绑定交换机的时候,如果使用第三种,exchange置为空会报错
     */

    /**
     * 1.此种用法需要手动创建队列,不然会报错
     */
    @RabbitListener(queues = "demo-1")
    public void demo1(Message message, Channel channel) throws IOException {
        System.err.println("demo-1:" + new String(message.getBody()));
        //信道上发布的消息都会被指派一个唯一的ID号:message.getMessageProperties().getDeliveryTag()
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

        //1:接收到的被指派一个唯一的ID号:deliveryTag;
        // 2:true当前consumer拒绝所有的deliveryTag包括此次的,false当前consumer只拒绝此次的这个deliveryTag;
        // 3:true此消息重新排队,而不是被丢弃或者扔进死信队列中
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
    }

    /**
     * 2.如果队列不存在,则会新建;如果队列已存在,则使用此队列
     */
    @RabbitListener(queuesToDeclare = @Queue("demo-2"))
    public void demo2(Message message, Channel channel) throws IOException {
        System.err.println("demo-2:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }


    /**
     * 3.绑定队列、交换机
     * 如果队列或交换机不存在,则新建;如果已存在,则使用。
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("demo-3"),
            exchange = @Exchange("demo-exchange-3")
    ))
    public void demo3(Message message, Channel channel) throws IOException {
        System.err.println("demo-3:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    /****************************demo 演示如何消费消息 start****************************/

    /****************************hello start****************************
     * 消费 hello-world
     */
    @RabbitListener(queuesToDeclare = {@Queue("hello")})
    public void hello(Message message, Channel channel) throws IOException {
        System.err.println("hello:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************hello end****************************/

    /****************************work start****************************
     * 消费 work 一个队列对应多个消费者,此时,消息是平均分配到每个消费者手里的。
     */
    @RabbitListener(queuesToDeclare = {@Queue("work")})
    public void work1(Message message, Channel channel) throws IOException {
        System.err.println("work1:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(queuesToDeclare = {@Queue("work")})
    public void work2(Message message, Channel channel) throws InterruptedException, IOException {
        Thread.sleep(1230);
        System.err.println("work2:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(queuesToDeclare = {@Queue("work")})
    public void work3(Message message, Channel channel) throws InterruptedException, IOException {
        Thread.sleep(30);
        System.err.println("work3:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************work end****************************/


    /****************************Publish/Subscribe start****************************
     * 消费 Publish/Subscribe 扇形交换机负责向绑定其队列上的所有消费者进行分发。
     * 此处,exchange和queue如果不存在,则会新建。
     * 注意:@Exchange注解属性 type需要定义为 fanout,不然会是默认的 direct
     */
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue("sub-pub-queue1"),
            exchange = @Exchange(value = "pub-sub-exchange", type = ExchangeTypes.FANOUT)
    )})
    public void pubSub1(Message message, Channel channel) throws IOException {
        System.err.println("sub-pub-queue1:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue("sub-pub-queue2"),
            exchange = @Exchange(value = "pub-sub-exchange", type = ExchangeTypes.FANOUT)
    )})
    public void pubSub2(Message message, Channel channel) throws IOException {
        System.err.println("sub-pub-queue2:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************Publish/Subscribe end****************************/


    /****************************Routing start****************************
     * 消费 Routing
     * 其中一个队列,只消费error信息,一个队列消费所有信息
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue("routing-log-error"), key = {"routing-log-error"}, exchange = @Exchange(value = "routing-exchange"))})
    public void routing1(Message message, Channel channel) throws IOException {
        System.err.println("routing-log-error:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue("routing-log-all"), key = {"routing-log-error", "routing-log-info", "routing-log-waring"}, exchange = @Exchange(value = "routing-exchange")
    )})
    public void routing2(Message message, Channel channel) throws IOException {
        System.err.println("routing-log-all:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************Routing end****************************/


    /****************************Topic start****************************
     * 消费 Topic
     * 其中一个队列,消费前缀为'topic-log-info.'的信息,
     * 一个队列消费前缀为'topic-log-error.'的和模糊匹配'.201907.'的信息
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-log-1"), key = {"topic-log-info.*"}, exchange = @Exchange(value = "topic-exchange", type = ExchangeTypes.TOPIC))})
    public void topic1(Message message, Channel channel) throws IOException {
        System.err.println("所有的info:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-log-2"), key = {"*.201907.*", "topic-log-error.*"}, exchange = @Exchange(value = "topic-exchange", type = ExchangeTypes.TOPIC))})
    public void topic2(Message message, Channel channel) throws IOException {
        System.err.println("所有的error and 7月份:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
    /****************************Routing end****************************/

}

RabbitTemplateCallback.java

代码语言:javascript
复制
package cn.qbz.rabbitmqtest.rabbit;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author qubianzhong
 * @date 2019/7/31 10:56
 */
@Component
public class RabbitTemplateCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this::confirm);
        this.rabbitTemplate.setReturnCallback(this::returnedMessage);
    }

    /**
     * Confirmation callback.
     *
     * @param correlationData correlation data for the callback.
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            System.err.println("消息未能在交换机上得到确认!异常处理......");
        }
    }

    /**
     * Returned message callback.
     *
     * @param message    the returned message.
     * @param replyCode  the reply code.
     * @param replyText  the reply text.
     * @param exchange   the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.err.println("消息未能到达队列!" + "return exchange: " + exchange + ", routingKey: "
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
    }
}

RPC 补充

我们并不推荐RPC式的mq调用,这么做完全没有发挥mq异步削峰的作用。如果有使用RPC的需求,请移步SpringCloud或者Dubbo。

我们虽然不使用RabbitMQ来进行RPC调用,但是我们也要了解,RabbitMQ为啥子可以实现RPC。

当使用RabbitMQ来实现RPC时.你只是简单地发布消息而已。RabbitMQ会负责使用绑定来路由消息到达合适的队列。RPC服务器会从这些队列上消费消息。RabbitMQ替你完成了所有这些艰难的工作:将消息路由到合适的地方,通过多台RPC服务器对RPC消息进行负载均衡,甚至当处理消息的服务器崩溃时,将RPC消息重发到另一台。

问题在于。如何将应答返回给客户端呢?毕竟,到目前为止你体验的RabbitMQ是发后即忘模型。

RabbitMQ团队想出了一个优雅的解决方案:使用消息来发回应答。在每个AMQP消息头里有个字段叫作reply_ to,消息的生产者可以通过该字段来确定队列名称,并监听队列等待应答。然后接收消息的RPC服务器能够检杳reply _to字段,并创建包含应答内容的新的消息,并以队列名称作为路由键。

你也许想:“光是每次创建唯一队列名就得花很多工夫吧。我们怎样阻止其他客户端读到应答消息呢?”

我们前面说过,如果你声明了没有名字的队列,RabbitMQ会为你指定一个。这个名字恰好是唯一的队列名;同时在声明的时候指定exclusive参数.确保只有你可以读取队列上的消息。所有RPC客户端需要做的是声明临时的、排他的、匿名队列,并将该队列名称包含到RPC消息的reply _to头中。于是服务器端就知道应答消息该发往哪儿了。值得注意的是我们并没有提到将应答队列绑定到交换器上。这是因为当RPC服务器将应答消息发布到RabbitMQ而没有指定交换器时.RabbitMQ就知道目的地是应答队列,路由键就是队列的名称。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019年08月05日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Hello World
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • RPC
  • 环境
  • RPC 补充
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档