我们在微服务中一个命令会逐渐调用各个微服务,但如果一一调用不仅需要微服务实时同步交互还会浪费效率
所以我们通常会采用MQ,也就是消息队列Message Queue来处理这个问题
下面我们会通过几个方法介绍消息队列:
首先我们先来介绍消息队列的各个信息
首先我们需要先去了解同步通信:
我们给出一个同步通信的简单例子:
我们对上图进行简单解释:
/*
用户使用支付服务,支付服务需要经过一系列操作之后才能返回结果给用户
具体服务:支付服务->订单服务->仓储服务->短信服务->...
*/
// 那么就会存在以下问题:
// 1.假设我们每个服务耗时1s,那么多个服务累计在一起,耗时逐渐增多用户得到结果的速度会变慢
// 2.如果我们需要添加新的服务,那么我们需要在原函数中添加该服务的调用方法,会修改原有代码,导致修改困难
// 3.并且当前面的操作进行过程中,后面的操作手中仍存有该流程的资源无法释放,导致资源损耗需要当当前服务结束后才可释放
// 4.最可怕的是,当其中有一个服务出现错误,那么整条服务链就会出现错误,导致后面的服务无法执行,导致用户无法得到结果!!!
我们可以很明显的感觉到同步通信的优点:
但是缺点也非常的多:
我们同样给出异步通信的概念:
那么异步通信的优点其实很明显:
但是缺点同样明显:
我们来认识一下市面上常见的消息队列:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
我们给出一些消息队列选择的建议:
我们主要去学习RabbitMQ的基本使用
我们如果要去使用RabbitMQ,首先需要先进行插件安装:
# docker拉取镜像(docker在之前的文章中已经介绍过了~)
docker pull rabbitmq:3-management
docker run \ # docker启动容器
-e RABBITMQ_DEFAULT_USER=root \ # 配置环境:mq用户名
-e RABBITMQ_DEFAULT_PASS=123321 \ # 配置环境:mq密码
--name mq \ # mq名称
--hostname mq1 \ # mq主机名(单机部署可以省略,集群部署需要)
-p 15672:15672 \ # 开放端口号:管理平台端口,ui界面
-p 5672:5672 \ # 开放端口号:消息队列端口,作为Broker的核心端口
-d \
rabbitmq:3-management
首先我们需要知道最基本的消息队列模型:
他们的用途分别是:
其基本流程图为:
那么下面我们就来完成一个基本的RabbitMQ的小项目(只需了解):
/*
发布者
具体逻辑为:建立连接->创建Channel->声明队列->发送消息->关闭连接和channel
*/
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
/*
订阅者
具体逻辑为:建立连接->创建Channel->声明队列->订阅消息
*/
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
到这里我们已经基本了解了RabbitMQ的使用,让我们进入下一章节!
SpringAMQP是针对MQ的API更新,也就是使用简单的API去完成上述复杂的RabbitMQ使用过程
在正式接收SpringAMQP之前,我们需要先去了解一下RabbitMQ的五种常见消息模型:
首先我们需要去了解AMQP:
那么我们再去了解SpringAMQP:
其实简单来说SpringAMQP为我们提供了三个功能:
我们利用SpringAMQP来实现简单消息队列:
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# 应当在Publisher发布者和Consumer订阅者两个子工程下均配置地址
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
// 注意:在Publisher工程下的test模块下书写该发送消息的test代码
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
// 注意:在Consumer订阅者下的Listener文件(自己创建)下创建该监听类,需设置为Bean
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
// 能够被Spring扫描到
@Component
public class SpringRabbitListener {
// 核心点:监听simple.queue队列
@RabbitListener(queues = "simple.queue")
// 发布者发布什么类型,监听者就接收什么类型并做出对应处理
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
我们先来简单介绍一下工作消息队列:
我们来使用SpringAMQP来实现工作消息队列:
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
// 我们这里模拟发送了五十条消息,平均每20ms发送一条
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
// 第一个订阅者平均每20ms获得一个消息
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
// 第二个订阅者平均每200ms获得一个消息
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
/*
但是由于两个订阅者均未设置阈值
所以他们并不会在结束后才去拿去消息
而是依次去获取消息,也就意味着不管他们何时结束自己的消息,他们都平分获取25条消息
20ms订阅者1拿消息并处理,40ms订阅者2拿消息并处理,60ms订阅者1拿消息并处理,80ms订阅者2拿到消息但并不能处理,依次循环
结论:
- 两者均拿到25条消息
- 订阅者1在980ms时结束所有的消息获取,并结束所有消息处理
- 订阅者2在1000ms时结束所有的消息获取,但是还需要在5000ms(大概哈)才能完全处理消息
*/
spring:
rabbitmq:
listener:
simple: # 队列名称
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
我们首先来详细介绍一下发布订阅(广播)的结构:
我们同样采用SpringAQMP来实现发布订阅广播:
@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "qiuluo.fanout";
// 消息
String message = "hello, everyone!";
// 第一个参数是交换机名称,因为目前的publisher只能发送信息给交换机,由交换机来决定传递给哪个消息队列
// 第二个参数是key值选择,我们会在后面用到
// 第三个参数是所传递的信息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
// 和之前一样,Consumer从消息队列那里获取信息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
// 和之前一样,Consumer从消息队列那里获取信息
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
// 在consumer中创建一个类,声明队列和交换机
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
// 采用@Bean的形式将其设置为Bean
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
// 采用@Bean的形式将其设置为Bean
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
// 采用BindingBuilder的bind,to方法进行交换机与队列的绑定即可(固定形式)
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
我们同样来简单介绍一下发布订阅路由:
我们下面采用SpringAMQP的注解声明方式来实现发布订阅路由:
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "qiuluo.direct";
// 消息
String message = "红色警报!";
// 发送消息
// 这里就用到了第二个参数,就是key值
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
// 采用@RabbitListener注解的bindings参数,在里面需要表明value(队列名称),exchange(交换机名称),key(队列的key值)
// 其内部的数据都需要采用@注解来给出
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "qiuluo.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "qiuluo.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
我们同样来简单介绍一下发布订阅路由:
我们同样采用SpringAMQP来给出一个发布订阅主题的案例:
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "qiuluo.topic";
// 消息
String message = "喜报!胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
// 这里仅仅对exchange的type类型进行更改,并且更改了key值
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "qiuluo.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "qiuluo.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
最后我们介绍一个简单的知识点:
所以我们在正常使用时通常会去更换默认消息转换器,采用JSON消息转换器:
<!--在publisher和consumer两个服务中都引入依赖-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
这篇文章中介绍了消息队列的内容并详细介绍了RabbitMQ以及SpringAMQP,希望能为你带来帮助