在RabbitMQ中,我们常用的模型主要有六种,分别是:
俗话说得好,光说不练假把式,下面我们结合springBoot逐一实现这六种模型。
从上图可以看出,这是一个默认交换机的单播路由,并且每个队列只有一个消费者。
从上图可以看出,主要的部分是:默认交换机的单播路由,并且每个队列有多个消费者。
从上图可以看出,主要的部分是:扇形交换机的多播路由。
从上图可以看出,主要的部分是:直连交换机的多播路由。
从上图可以看出,主要的部分是:主题交换机的多播路由。
从上图可以看出,主要的部分是:默认交换机的单播路由。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.qbz</groupId>
<artifactId>rabbit-mq-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbit-mq-test</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /qbz-test #虚拟主机,必须存在,不然会报错。
username: guest
password: guest
publisher-confirms: true #消息发送到交换机确认机制,是否确认回调,默认false
publisher-returns: true #消息发送到交换机确认机制,是否返回回调,默认false
listener:
simple:
prefetch: 1 #预先载入数量,默认值250
concurrency: 10 #指定最小消费数量
max-concurrency: 20 #指定最大的消费者数量,当并行消费者数量到达concurrency后会开至最大max-concurrency
acknowledge-mode: manual # 采用手动应答,设置为手动应答后,消费者如果不进行手动应答,会处于假死状态,不能再消费。默认auto
retry:
enabled: true # 失败重试机制,默认为false.
max-attempts: 1 # 失败后,再重试几次,默认为:3
RabbitMqTestApplication.java
package cn.qbz.rabbitmqtest;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* RabbitMQ 测试
*
* @author qubianzhong
* @Date 13:42 2019/7/26
*/
@SpringBootApplication
@EnableRabbit //新增开启Rabbit注解
public class RabbitMqTestApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqTestApplication.class, args);
}
}
RabbitMqProduceController.java
package cn.qbz.rabbitmqtest.controller;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author qubianzhong
* @date 2019/7/22 13:51
*/
@RestController
public class RabbitMqProduceController {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
@GetMapping(value = "/produce")
public String produceMsg(@RequestParam(value = "msg") String msg) {
/****************************hello/work start****************************
* hello-world、work queues 均是只绑定队列
* 发布消息时,如下所示,推荐使用第二种
*/
//1.如果队列不存在,则消息会进入“黑洞”
amqpTemplate.convertAndSend("hello", msg + System.currentTimeMillis());
/**
* 2.如果队列不存在,则进行创建,如果队列已存在,则使用此队列
* public Queue(String name) {
* //The queue is durable, non-exclusive and non auto-delete.
* this(name, true, false, false);
* }
*/
Queue queue = new Queue("hello");
amqpAdmin.declareQueue(queue);
amqpTemplate.convertAndSend(queue.getName(), msg + System.currentTimeMillis());
//3.work
queue = new Queue("work");
amqpAdmin.declareQueue(queue);
for (int i = 0; i < 30; i++) {
amqpTemplate.convertAndSend(queue.getName(), "wwoork" + i);
}
/****************************hello/work end****************************/
/****************************Publish/Subscribe start****************************
* 生产者只向扇形交换机发送消息,扇形交换机负责向绑定其队列上的所有消费者进行分发。
* public AbstractExchange(String name) {
* //Construct a new durable, non-auto-delete Exchange with the provided name.
* this(name, true, false);
* }
*/
for (int i = 0; i < 5; i++) {
Exchange pubSubExchange = new FanoutExchange("pub-sub-exchange");
amqpAdmin.declareExchange(pubSubExchange);
amqpTemplate.convertAndSend(pubSubExchange.getName(), null, msg + System.currentTimeMillis());
}
/****************************Publish/Subscribe end****************************/
/****************************Routing start****************************
* Routing 消费者消费的时候,多个路由键绑定一个队列
*/
Exchange routeExchange = new DirectExchange("routing-exchange");
amqpAdmin.declareExchange(routeExchange);
amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-error", "routing-log-error:" + System.currentTimeMillis());
amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-info", "routing-log-info:" + System.currentTimeMillis());
amqpTemplate.convertAndSend(routeExchange.getName(), "routing-log-waring", "routing-log-waring:" + System.currentTimeMillis());
/****************************Routing end****************************/
/****************************Topic start****************************
* Topic 消费者消费的时候,多个路由键,模糊匹配,绑定一个队列,其实和routing差不多
*/
Exchange topicExchange = new TopicExchange("topic-exchange");
amqpAdmin.declareExchange(topicExchange);
amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-error.20190706", "topic-log-error.20190706:" + System.currentTimeMillis());
amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-info.20190606", "topic-log-info.20190606:" + System.currentTimeMillis());
amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-error.20190708", "topic-log-error.20190708:" + System.currentTimeMillis());
amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-waring.20190506", "topic-log-waring.20190506:" + System.currentTimeMillis());
amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-waring.20190516", "topic-log-waring.20190516:" + System.currentTimeMillis());
amqpTemplate.convertAndSend(topicExchange.getName(), "topic-log-info.20190723", "topic-log-info.20190723:" + System.currentTimeMillis());
/****************************Topic end****************************/
return "success";
}
}
RabbitTestListener.java
package cn.qbz.rabbitmqtest.rabbit;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author qubianzhong
* @date 2019/7/26 14:40
*/
@Component
public class RabbitTestListener {
/****************************demo 演示如何消费消息 start****************************
* 推荐使用第二种或第三种
* 当不需要绑定交换机的时候,如果使用第三种,exchange置为空会报错
*/
/**
* 1.此种用法需要手动创建队列,不然会报错
*/
@RabbitListener(queues = "demo-1")
public void demo1(Message message, Channel channel) throws IOException {
System.err.println("demo-1:" + new String(message.getBody()));
//信道上发布的消息都会被指派一个唯一的ID号:message.getMessageProperties().getDeliveryTag()
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
//1:接收到的被指派一个唯一的ID号:deliveryTag;
// 2:true当前consumer拒绝所有的deliveryTag包括此次的,false当前consumer只拒绝此次的这个deliveryTag;
// 3:true此消息重新排队,而不是被丢弃或者扔进死信队列中
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
}
/**
* 2.如果队列不存在,则会新建;如果队列已存在,则使用此队列
*/
@RabbitListener(queuesToDeclare = @Queue("demo-2"))
public void demo2(Message message, Channel channel) throws IOException {
System.err.println("demo-2:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
/**
* 3.绑定队列、交换机
* 如果队列或交换机不存在,则新建;如果已存在,则使用。
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("demo-3"),
exchange = @Exchange("demo-exchange-3")
))
public void demo3(Message message, Channel channel) throws IOException {
System.err.println("demo-3:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
/****************************demo 演示如何消费消息 start****************************/
/****************************hello start****************************
* 消费 hello-world
*/
@RabbitListener(queuesToDeclare = {@Queue("hello")})
public void hello(Message message, Channel channel) throws IOException {
System.err.println("hello:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
/****************************hello end****************************/
/****************************work start****************************
* 消费 work 一个队列对应多个消费者,此时,消息是平均分配到每个消费者手里的。
*/
@RabbitListener(queuesToDeclare = {@Queue("work")})
public void work1(Message message, Channel channel) throws IOException {
System.err.println("work1:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
@RabbitListener(queuesToDeclare = {@Queue("work")})
public void work2(Message message, Channel channel) throws InterruptedException, IOException {
Thread.sleep(1230);
System.err.println("work2:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
@RabbitListener(queuesToDeclare = {@Queue("work")})
public void work3(Message message, Channel channel) throws InterruptedException, IOException {
Thread.sleep(30);
System.err.println("work3:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
/****************************work end****************************/
/****************************Publish/Subscribe start****************************
* 消费 Publish/Subscribe 扇形交换机负责向绑定其队列上的所有消费者进行分发。
* 此处,exchange和queue如果不存在,则会新建。
* 注意:@Exchange注解属性 type需要定义为 fanout,不然会是默认的 direct
*/
@RabbitListener(bindings = {@QueueBinding(
value = @Queue("sub-pub-queue1"),
exchange = @Exchange(value = "pub-sub-exchange", type = ExchangeTypes.FANOUT)
)})
public void pubSub1(Message message, Channel channel) throws IOException {
System.err.println("sub-pub-queue1:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
@RabbitListener(bindings = {@QueueBinding(
value = @Queue("sub-pub-queue2"),
exchange = @Exchange(value = "pub-sub-exchange", type = ExchangeTypes.FANOUT)
)})
public void pubSub2(Message message, Channel channel) throws IOException {
System.err.println("sub-pub-queue2:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
/****************************Publish/Subscribe end****************************/
/****************************Routing start****************************
* 消费 Routing
* 其中一个队列,只消费error信息,一个队列消费所有信息
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue("routing-log-error"), key = {"routing-log-error"}, exchange = @Exchange(value = "routing-exchange"))})
public void routing1(Message message, Channel channel) throws IOException {
System.err.println("routing-log-error:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue("routing-log-all"), key = {"routing-log-error", "routing-log-info", "routing-log-waring"}, exchange = @Exchange(value = "routing-exchange")
)})
public void routing2(Message message, Channel channel) throws IOException {
System.err.println("routing-log-all:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
/****************************Routing end****************************/
/****************************Topic start****************************
* 消费 Topic
* 其中一个队列,消费前缀为'topic-log-info.'的信息,
* 一个队列消费前缀为'topic-log-error.'的和模糊匹配'.201907.'的信息
*/
@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-log-1"), key = {"topic-log-info.*"}, exchange = @Exchange(value = "topic-exchange", type = ExchangeTypes.TOPIC))})
public void topic1(Message message, Channel channel) throws IOException {
System.err.println("所有的info:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
@RabbitListener(bindings = {@QueueBinding(value = @Queue("topic-log-2"), key = {"*.201907.*", "topic-log-error.*"}, exchange = @Exchange(value = "topic-exchange", type = ExchangeTypes.TOPIC))})
public void topic2(Message message, Channel channel) throws IOException {
System.err.println("所有的error and 7月份:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
/****************************Routing end****************************/
}
RabbitTemplateCallback.java
package cn.qbz.rabbitmqtest.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author qubianzhong
* @date 2019/7/31 10:56
*/
@Component
public class RabbitTemplateCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this::confirm);
this.rabbitTemplate.setReturnCallback(this::returnedMessage);
}
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.err.println("消息未能在交换机上得到确认!异常处理......");
}
}
/**
* Returned message callback.
*
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.err.println("消息未能到达队列!" + "return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
}
我们并不推荐RPC式的mq调用,这么做完全没有发挥mq异步削峰的作用。如果有使用RPC的需求,请移步SpringCloud或者Dubbo。
我们虽然不使用RabbitMQ来进行RPC调用,但是我们也要了解,RabbitMQ为啥子可以实现RPC。
当使用RabbitMQ来实现RPC时.你只是简单地发布消息而已。RabbitMQ会负责使用绑定来路由消息到达合适的队列。RPC服务器会从这些队列上消费消息。RabbitMQ替你完成了所有这些艰难的工作:将消息路由到合适的地方,通过多台RPC服务器对RPC消息进行负载均衡,甚至当处理消息的服务器崩溃时,将RPC消息重发到另一台。
问题在于。如何将应答返回给客户端呢?毕竟,到目前为止你体验的RabbitMQ是发后即忘模型。
RabbitMQ团队想出了一个优雅的解决方案:使用消息来发回应答。在每个AMQP消息头里有个字段叫作reply_ to,消息的生产者可以通过该字段来确定队列名称,并监听队列等待应答。然后接收消息的RPC服务器能够检杳reply _to字段,并创建包含应答内容的新的消息,并以队列名称作为路由键。
你也许想:“光是每次创建唯一队列名就得花很多工夫吧。我们怎样阻止其他客户端读到应答消息呢?”
我们前面说过,如果你声明了没有名字的队列,RabbitMQ会为你指定一个。这个名字恰好是唯一的队列名;同时在声明的时候指定exclusive参数.确保只有你可以读取队列上的消息。所有RPC客户端需要做的是声明临时的、排他的、匿名队列,并将该队列名称包含到RPC消息的reply _to头中。于是服务器端就知道应答消息该发往哪儿了。值得注意的是我们并没有提到将应答队列绑定到交换器上。这是因为当RPC服务器将应答消息发布到RabbitMQ而没有指定交换器时.RabbitMQ就知道目的地是应答队列,路由键就是队列的名称。