MQ全称为Message Queue即消息队列
"消息队列"
是在消息的传输过程中保存消息的容器
或者说MQ 有什么好处,MQ 主要可以实现三种功能:
程序非常的耦合
解耦
A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据
下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可
我们增加到10台服务器,减压
避免了同一时刻大量的请求,而处理不过来导致 服务崩溃~
我们不可能一直停留在页面上进行等待~
支付后————发送支付成功的通知————再寻找外卖小哥来进行配送…
而寻找外卖小哥的过程非常耗时,高峰期,可能要等待几十秒甚至更长,这样就造成整条调用链路响应非常缓慢
为您寻找骑手,整条链路的响应时间只有200毫秒左右
消息接收方,监听获取每一个订单消息后台缓慢的寻找外卖小哥~
⭐
AMQP 和 JMS 是目前市面上常见的两种 消息队列协议
高级消息队列协议!
是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS RabbitMQ 就是基于 AMQP 协议实现的
Java 消息服务
JMS的客户端之间可以通过JMS服务
进行异步的消息传输
就像JDBC一样通过接口定义一组规范,不同的实现尝试实现对于的驱动来完成开发...
它使分布式通信耦合度更低,消息服务更加可靠以及异步性。 ActiveMQ 就是基于 JMS 规范实现的
规范:
跨平台
有跨平台、跨语言特性
支持消息类型
JMS
支持TextMessage、MapMessage 等复杂的消息类型AMQP
仅支持 byte[] 消息类型(复杂的类型可序列化后发送Exchange 交换机
提供的路由算法
早期的MQ框架,现在已经很少使用了
本篇学习😶
erlang
语言开发,所以安装环境需要安装 erlang
AMQP
(Advanced Message Queue 高级消息队列协议
)协议实现的消息队列
Producer 消息生产者
通过信道Channel
发送到MQConnection 连接对象
Channel信道
:生产者通过 信道 将消息发送给MQ
消费者通过 信道 获取到MQ的消息~
细节不详细介绍
可以理解为是一个,消息数据传递的一个通到
可以通过它,来创建配置,生产者|消费者 与MQ通信
声明设置绑定:交换机|队列
Broker 可以认为是 MQ
消息队列服务进程
:此进程包括两个部分:Exchange交换机
和Queue队列
队列
队列就像是一个“吸管” 一边吸水一边出水,遵循 “先进先出”原则;
生产者发消息——交换机——转发到队列上 是真正消息存储的地方~
Consumer 消息消费者
通关信道Channel
接收MQ转发的消息,并进行相关的处理;队列中去!
⭐
RabbitMQ消息传递模型的核心思想是:
交换机类型:
发布/订阅
,交换机给所有的队列,发送相同的消息;
routing key
交换机,根据对应的 routing key
的队列上发送消息;
roting key
使 交换机动态的多样性选择 队列
* 表示一个单词
# 表示任意数量(零个或多个)单词
目前用的很少了
,就像请求头一样,发送消息时候附带头部数据
,交换机根据消息的头部信息匹配对应的队列;
✔
本次搭建是Linux 的 如果有朋友是Win的话可以参考这篇文章:🚀
RabbitMQ是由erlang
语言开发,所以安装环境需要安装 erlang
erlang环境
rabbit安装
官网下载,如果没有的话也可以底部本人的网盘下载
本人使用的是 阿里云服务器
没有的话也可以使用虚拟机… 事先使用连接工具上传了文件
本人喜欢把工具都安装在 /usr/wsm
目录下:
[root@iZj6ciuzx7luldnazt4iswZ ~]# cd /
[root@iZj6ciuzx7luldnazt4iswZ /]# ls
bin dev home lib lost+found mnt patch root sbin sys usr www
boot etc install.sh lib64 media opt proc run srv tmp var
[root@iZj6ciuzx7luldnazt4iswZ /]# cd usr
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin etc games include lib lib64 libexec local sbin share src tmp
[root@iZj6ciuzx7luldnazt4iswZ usr]# mkdir wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# ls
bin etc games include lib lib64 libexec local sbin share src tmp wsm
[root@iZj6ciuzx7luldnazt4iswZ usr]# cd wsm
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm rabbitmq-server-3.8.8-1.el7.noarch.rpm #上传的两个文件
解压安装:
# 解压安装 erlang
rpm -ivh erlang-21.3.8.21-1.el7.x86_64.rpm
# 云下载一个 初始化一些配置, 过程比较慢请耐心等待~, 在这之后才可以进行 安装 RabbitMQ
yum install socat -y
# 解压安装 rabbitmq
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
ok ,安装完毕了解一些 RabbitMQ 命令:
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 重启服务
systemctl restart rabbitmq-server
注意:这里只是把RabbitMQ 服务
给搭建好了,为了方便操作我们还需要安装一个web控制面板
# 安装web控制面板
rabbitmq-plugins enable rabbitmq_management
# 安装完毕以后,重启服务即可
systemctl restart rabbitmq-server
# 访问 http://服务器ip:15672 ,用默认账号密码(guest)登录,出现权限问题
# 默认情况只能在 localhost 本机下访问,所以需要添加一个远程登录的用户
# 创建账号和密码: admin 123456
rabbitmqctl add_user admin 123456
# 设置用户角色,用户级别: administrator monitoring policymaker managment
rabbitmqctl set_user_tags admin administrator
# 为用户添加资源权限
# set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 添加配置、写、读权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
##### 扩展一些命令:#####
关闭应用的命令为: rabbitmqctl stop_app
清除的命令为: rabbitmqctl reset
重新启动命令为: rabbitmqctl start_app
别忘记开启端口 还有 关闭防火墙~
用户级别:
主要端口介绍:阿里云建议将这些都打开~
访问页面:
集群各个节点的信息 情况
MQ 各个端口映射信息
生产者
消费者
连接情况.
通道 与 连接的关系
点击连接名, 还可以查看详细的信息~
running 运行
idle 空闲
SSL|TLS协议
AMQP 0-9-1
指的是AMQP 的协议版本号记录各个连接的信道:
一个连接IP 可以有多个信道
多个通道通过多线程实现,不相互干扰 我们在 信道中创建:队列 交换机 ...
生产者的通道一般使用完之后会立马关闭,消费者是一直监听的…
确认模式
C confirm模式
T 表示事务
running 运行
idie 空闲
预先载入
Prefetch 表示每个消费者最大的能承受的未确认消息数目
简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,
一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了
消费者负责不断处理消息,不断 ack,然后只要 UnAcked
数少于 Prefetch * consumer
数目,RabbitMQ 就不断将消息投递过去
速率
速率
drop
表示消息,未被接收,且已经删除的消息.
速率
MQ 的 ACK机制:100%消息消费!
创建一个Maven项目并使用 git 进行管理, wlog.md
文件进行着项目日志的记录✍~
引入RabbitMQ
的依赖:
pom.xml
<dependencies>
<!-- rabbitMQ 依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
</dependencies>
如图,显而易见,非常简单就是一个一发一读
的过程…
Producer.Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 消息生产者 **/
public class Producer {
// 定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.243.109.199");
factory.setUsername("admin");
factory.setPassword("123456");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
//创建连接对象
Connection connection = factory.newConnection();
//根据连接对象,获取信道
Channel channel = connection.createChannel();
/**设置消息队列的属性!
* queue :队列名称
* durable :是否持久化 如果持久化,mq重启后队列数据还在! (队列是在虚拟路径上的...)
* exclusive :队列是否独占此连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* autoDelete :队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* arguments :队列参数 null,可以设置一个队列的扩展参数,需要时候使用!比如:可设置存活时间
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**发送消息,参数:
* exchange :指定的交换机,不指定就会有默认的....
* routingKey :路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机routingKey设置为队列的名称
* props :消息包含的属性: 后面介绍,可以是一个一个对象... 消息持久化配置...
* body :发送的消息,AMQP以字节方式传输...
* */
channel.basicPublish("", QUEUE_NAME, null, "Hello Word你好世界".getBytes());
System.out.println("消息发送完毕");
}
}
Consumer.Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/** 消息消费者 **/
public class Consumer {
// 定义队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.243.109.199");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息.........");
//收到消息后用来处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息 - 接受消息
* queue 消费哪个队列
* autoAck 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答,要通过编程实现回复验证,这就是Unacked 为返回ack的数据
* deliverCallback 消费方法,当消费者接收到消息要执行的方法, 参数是一个函数式接口可以使用 lambda表达式~
* cancelCallback 消息被取消时的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消费者
在启动 发送者
可以看到,消费者启动之后,在等待 发生者 发送消息,发送者启动发送消息,消费者控制台会立刻接收到消息!
这种情况下MQ 都会有一个默认的交换机~
工作模式 相当于 简单模式的 升级版!
复杂均衡形式
轮询的发送给多个消费者一般应用于:发送方事务简单,接收方事务复杂…
美团外卖:用户下单——后台内部要联系商家 骑手 生产订单 处理...
因为上面示例我们知道,创建交换机|队列 需要Channel信道
交换机 队列是创建在信道里面的
Connection
Channel
MQChannelUtil.Java
MQChannelUtil.Java
util包
专门用来存储工具类🛠import com.rabbitmq.client.Channel; //导入MQ的包~
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/** RabbitMQ 连接配置类: **/
public class MQChannelUtil {
//得到一个连接的 channel
public static Channel getChannel() throws Exception {
//创建一个连接工厂, 设置连接: IP 端口 用户 密码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("42.192.149.71");
// factory.setPort("设置对应的端口,默认就是: 5672");
factory.setUsername("admin");
factory.setPassword("123456");
//创建连接对象 信道对象
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
Producer.Java
import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;
/** 消息生产者 **/
public class Producer {
// 定义队列名称
private final static String QUEUE_NAME = "Word";
public static void main(String[] args) throws Exception {
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
// Java控制台测试法消息:
Scanner scanner = new Scanner(System.in);
//创建交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 循环多次发布消息:
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
}
Consumer1.Java
import com.rabbitmq.client.*;
import com.wsm.Util.MQChannelUtil;
/** 消息消费者 **/
public class Consumer1 {
// 定义队列名称
private final static String QUEUE_NAME = "Word";
public static void main(String[] args) throws Exception {
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
//收到消息后用来处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/** 消费者消费消息 - 接受消息: 注意参数两个回调函数~ */
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
Consumer2.Java
和消费者1 一模一样换一个名字,两个消费者监听一个队列 进行数据处理....
消息被轮询消费
:
消费者执行消费
消费者执行之后, 队列就会将消息删除,(ACK机制...
ACK
消息一旦被消费者接收,队列中的消息就会被删除
RabbitMQ怎么知道消息被接收了呢?
因此,RabbitMQ有一个ACK机制
默认此种模式:
消息发送后立即被认为已经传送成功! 消费者 接收到消息,就向队列发送ack,队列立刻就删除消息
待消费者 执行完毕之后,在通过代码向 队列发送ack,队列接收到ack 之后会将消息删除!
就像序列化 序列号一样,用于网络传输..
参数2 boolean类型,表示是否支持批量处理
同上,消息的唯一标识
参数2 表示是否支持批量处理
参数3 requeue true则重新入队列 false丢弃或者进入死信队列
同上
参数2 requeue true则重新入队列 false丢弃或者进入死信队列
与 Channel.basicNack 相比少一个参数,不可以进行批量处理…
1 2 3 4
四个消息,都发送给了消费者,而消费者逐一处理,4
结束了.
不管是否 ACK|NACK
都直接将,其它的 1 2 3
都以相同的,方式进行 批量处理!
好处:在MQ 服务,稳定的时候,支持大量的消息处理速度… 缺点,容易造成数据丢失💀...
建议使用,不批量应答
就是, 一次只处理当前消息的 ACK|NACK
消费者设置了手动ACK 之后....
如果消费者由于某些原因失去连接 其通道已关闭,连接已关闭或 TCP 连接丢失
导致消息未发送 ACK 确认
消费者服务挂了
Producer.Java
import com.rabbitmq.client.Channel;
import com.wsm.Util.MQChannelUtil;
import java.util.Scanner;
/** 消息生产者 **/
public class Producer {
// 定义队列名称
private final static String QUEUE_NAME = "ack_test";
public static void main(String[] args) throws Exception {
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
// Java控制台测试法消息:
Scanner scanner = new Scanner(System.in);
// 创建队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环多次发布消息:
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
}
}
}
Consumer1.Java
true自动ack
false手动ack
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wsm.Util.MQChannelUtil;
/** 消息消费者 **/
public class Consumer2 {
// 定义队列名称
private final static String QUEUE_NAME = "ack_test";
public static void main(String[] args) throws Exception {
// 工具类创建一个信道
Channel channel = MQChannelUtil.getChannel();
//收到消息后用来处理消息的回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 为了方便查看处理效果,我们将消费者 线程休眠一段时间 模拟处理数据;
try {
Thread.sleep(10000); // 毫秒 *1000;
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody());
System.out.println(message);
/** 接收处理完毕消息之后给MQ 回复ack
* 参数1 消息的唯一标识Tag
* 参数2 是否支持批量回复,一般建议false 保证数据安全!
* **/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/** 消费者消费消息 - 接受消息: 注意参数两个回调函数~ */
/** 参数二 设置 false 手动应答 **/
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
Consumer2.Java
和 消费者1 类似,为了方便测试更改了,线程休眠时间 30s
测试1
AA BB CC DD
测试2
AA BB CC DD
Thread.sleep(30000)
所以会有很长时间的处理,此时关闭消费者2
MQ 与 消费者2 断开连接,消费者2 也没有发送ACK 所以消费者2的消息将会 重回队列...又交给了 消费者1来进行消费!
消费1 | 2 都存在处理了 BB
为了确保数据安全,还要进行 幂等的处理!
简单
正常情况下,RabbitMQ 只是一个消息中间件 一边接收者生产者消息
等待消费者监听处理...
队列中
队列中为处理的消息...
生产者最新发送的消息...
...
队列
消息
都进行持久化处理,防止数据丢失~然队列持久化非常简单, 只需要一个配置即可:
在声明队列的时候,就通过参数就可以完成对队列的持久化,注意:队列持久化 并不是 消息持久化 队列每次重启都会恢复但是内部的消息 还需要另外的配置!
// 让队列持久化
boolean durable = true; // false 不持久化 true 持久化
// 声明队列
channel.queueDeclare("队列名", durable, false, false, null); // 设置true 之后,每次MQ重启的时候,该队列都会自动重新在虚拟路径上自动加载...
需要在 生产者
发送消息的时候添加一个配置 MessageProperties.PERSISTENT_TEXT_PLAIN
,告诉MQ 这个消息很重要,要进行持久化保存!
// 发送者发送消息的时候,带上 MessageProperties.PERSISTENT_TEXT_PLAIN 告诉,MQ消息要进行持久化;
channel.basicPublish("交换机", "队列名", MessageProperties.PERSISTENT_TEXT_PLAIN, "要发送的消息,字节传输");
消息仍然存在丢失问题
当消息,刚发到MQ 中,还没有准备,存储磁盘,消息还在缓存的一个"间隔点"
MQ 突然挂了… 也会影响到消息的 持久化;
但这里对于,普通的存储已经绰绰有余了…
前提是,设置消息手动ACK
在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发
消费者 1 处理任务的速度非常快
消费者 2 处理速度却很慢
这个时候依然采用默认的 轮询分发
势必不太合理…
分发模式
不公平 分发
我们可以通过 信道设置:channel.basicQos(1);
意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务.
rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,如果没有空闲消费者,消息就会堆积在队列中去~
预取值 分发
消息者 连接 队列
的时候,可以给 信道设置 预取值.其实预取值,和 不公平分发 很类似
消费者1
消费者2
都设置,channel.basicQos(1);
消费者最高允许消息堆积数.
两个消费者每次只能从,队列中拿一个消息进行消费,完了就立刻在从 队列中,在拿,这样做的快的消费者,自然就处理的消息多了!不公平分发
channel.basicQos(>1);
值大于>1
,预取值 分发
消费者1
消费者2
分别跟据服务的性能设置,channel.basicQos(?);
消费者最高允许消息堆积数.
假设: 消费者1 basicQos(2)
消费者2 basicQos(5)
这样,假设队列中有消息:1 2 3 4 5 6 7
,当最初 消费者1 | 2
都空闲时候…
消费者1获取1
消费者2获取2
消费者1获取3
消费者2获取4
消费者2获取5
消费者2获取6
....
消费者2 允许消息在信道中最大的堆积数 5
当然也有可能会出现,消费者1 处理很快,消费者2 很慢,消费者1处理完1 3,消费者2还在处理2 4567,那后面的 8 9 10 都给1处理…
预取值 分发,就是预计这个消费者,性能高低,设置消费者 允许最高堆积?个消息等待这个处理!
confirm
confirm
发布确认机制:生产者将信道设置成 confirm 模式
confirm
模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID 从1开始
broker
就会发送一个确认给 生产者
生产者就知道消息已经正确到达目的队列了.
消息队列 和 消息
进行了持久化设置
那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息已经,发送到队列上!
RabbiMQ 默认是没有开启 comfirm 发布确认机制
// 开启发布确认 channel.confirmSelect();
它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布
一次只能发一个!
因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量;
channel.confirmSelect();
2. 通过信道 channel.waitForConfirms();
来判断当前消息是否发送成功!
这个方法只有在消息被确认 的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常. 单个发布确认,只有一个消息发送 才能发送下一个消息
Producer.Java
/** 单个发布确认 **/
public static void singleConfirm() throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 队列声明
channel.queueDeclare("singleConfirm", false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 为了方便计算各个 发布确认策略 耗时: 开始-结束放一个系统时间获取毫秒数;
long begin = System.currentTimeMillis();
// 发送1000 个消息....
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", "singleConfirm", null, message.getBytes());
// 服务端返回 false 或超时时间内未返回,生产者可以消息重发
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
}
// else{ /** 消息重发处理... **/ }
}
// 1000个消息结束...
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
}
本次执行耗时:67042毫秒
相比 单个发布确认
, 发送确认机制,实在是太慢了! 每次只能保证一个消息的发送成功,实在是太慢了!
批量发布确认:
单个发布确认
实现方式一样
1. 开启发布确认 channel.confirmSelect();
2. 获取批量的消息,是否全部发送到MQ channel.waitForConfirms();
waitForConfirms()
方法,查看当前消息是否都到达MQ
不同的是,单个发布每次发一条都验证
批量是在一定数量进行验证
waitForConfirms();
方法会使,当前 发生者线程进行阻塞,等待MQ 返回数据,
MQ返回 上一次调用waitForConfirms() 到现在调用waitForConfirms() 所有发送的消息是否抵达MQ
全部抵达true 则false
大大节省了过程中冗余的一些步骤性能...
当然,如果其中有一个消息没有发送到MQ 它并不能确定是那一个 消息 没有抵达MQ
Producer.Java
/** 批量发布确认 **/
public static void batchConfirm() throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 队列声明
channel.queueDeclare("batchConfirm", false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 为了方便计算各个 发布确认策略 耗时: 开始-结束放一个系统时间获取毫秒数;
long begin = System.currentTimeMillis();
// 批量确认消息大小,当发送的消息数到 100 执行 waitForConfirms(); 询问MQ 当前所有的消息有没有抵达~
int batchSize = 100;
// 未确认消息个数, 每次发送消息 ++ 用于判断是否改批量验证消息发送;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
// 发送的消息,并发送~
String message = i + "";
channel.basicPublish("", "batchConfirm", null, message.getBytes());
// 每次发送一个消息进行计算当前是第几个,为批量验证的消息;
outstandingMessageCount++;
// 判断当前的 100 个消息有没有都发送的MQ上!
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
本次执行耗时:1149毫秒
快速了很多...
异步发布确认 相对于上面两个比较复杂,但: 性价比 效率
具有显著的提升
实现原理:
唯一的ID标识
** 后面我们就可以通过这个标识来,确定是那一个消息发送 成功|失败
监听器
addConfirmListener(ConfirmCallback,ConfirmCallback);
方法参数支持两个,ConfirmCallback类对象 一个表示接收消息做的事情
另一个是未接收到消息做的事情...
ConfirmCallback 是一个 函数式接口
, 支持 lambda表达式 和 内部类形式书写…
addConfirmListener(ack,nack)
方法
告知发送者,消息成功发送 | 或 未发送成功!
addConfirmListener(ack,nack)
来处理,消息成功处理,消息失败处理…
Producer.Java
/** 异步发布确认 **/
public static void syncConfirm() throws Exception{
Channel channel = MQChannelUtil.getChannel();
// 队列声明
channel.queueDeclare("syncConfirm", false, false, false, null);
/** 开启发布确认 **/
channel.confirmSelect();
// 为了方便计算各个 发布确认策略 耗时: 开始-结束放一个系统时间获取毫秒数;
long begin = System.currentTimeMillis();
/** 步骤一: 创建一个线程安全的一个哈希表,用于记录每一个消息发送,这样MQ异步返回时候可以知道具体是那一个消息发送成功|失败 **/
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联: k,v 存储结构, k消息标识 v发送的消息体,每次发送消息前先存在集合中;
* 2.轻松批量删除条目只要给到序列号: 对于发现成功的消息,直接从集合中移除...
* 3.支持并发访问,Concurrent接口 是线程安全的; */
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/** 步骤三: 编写回调监听器,因为: 消息发送出错要立刻进行监听所以,所以创建在发送消息之前; **/
/** ack 确认收到消息的一个回调 1.消息序列号 2.true 批量确认接受小于等于当前序列号的数据 false 确认当前序列号消息 */
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
System.out.println("消息成功接收:"+sequenceNumber);
// ConcurrentNavigableMap方法()返回的是小于|等于 K 的集合, true:小于等于 false:返回小于该序列号的数据集合;
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
// 清除该部分确认消息 confirmed 里保存的都是,MQ 已经接收的消息;
// 遍历 confirmed K, 根据 K 删除 outstandingConfirms 的值...
// outstandingConfirms 里面保存的都是,MQ 还未确认的消息...
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
// nack 消息失败执行{} 可以写,消息失败需要执行的代码...
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
// 这里就输出一下为被确认的消息...
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
// 发生者 等待MQ回调消息确认的 监听器, 本次程序值监听 ack成功的消息;
channel.addConfirmListener(ackCallback, null);
/** 步骤二: 发送者一直往MQ发送消息 **/
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
// channel.getNextPublishSeqNo() 获取下一个消息的序列号,通过序列号与消息体进行一个关联,全部都是未确认的消息体
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
// 发送消息;
channel.basicPublish("", "syncConfirm", null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}
耗时:96毫秒
超级快的好吧
非常nice👍
以上我们创建一个: 发送者
队列
消费者
就可以完成通信了
而,RabbitMQ 的核心思想是: 生产者 生产的消息不会直接发送到队队列上, 甚至不知道队列的存在,通过一个交换机
一方面它接收来自生产者的消息
另一方面将它们推入队列
交换机必须确切知道如何处理收到的消息,这就的由交换机的类型来决定.
发布/订阅
,交换机给所有的队列,发送相同的消息;
direct : 路由模式routing key
交换机,根据对应的 routing key
的队列上发送消息;
topic: 动态路由模式,可以用过一定的规则定义 roting key
使 交换机动态的多样性选择 队列
* 表示一个单词
# 表示任意数量(零个或多个)单词
headers: 请求头模式,目前用的很少了
,就像请求头一样,发送消息时候附带头部数据
,交换机根据消息的头部信息匹配对应的队列;
无名交换机:
交换机
进行任何的操作,但是,仍然可以进行消息发送|接收
channel.basicPublish("", "队列名", null, "发送的消息".getBytes());
对于 “” 空字符串的交换机,MQ 会有默认的交换机进行操作…
临时队列:
临时队列
String queueName = channel.queueDeclare().getQueue();
让服务器 信道,给我们创建一个临时的队列,随机队列名称
一旦我们断开了消费者的连接,队列将被自动删除
交换机类型:Fanout
Fanout 类型:
某某软件很多人关注/订阅了一个博主,博主一更新,所有的粉丝都收到更新消息!
Fanout 实战:
绑定
,每当有消息来的时候,交换机会将消息发送到所有的队列中去… 每个消息者监听(订阅)一个队列,多个消息者可以同,监听到相同的消息;
Producer.Java
/** 消息生产者 **/
public class Producer {
// 交换机名
public static final String EXCHANGE_NAME = "wsm";
public static void main(String[] args) throws Exception {
// 创建连接对象,声明交换机 发送消息
Channel channel = MQChannelUtil.getChannel();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型, 可以是String直接写,也可以是 枚举类型;
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
/** 消息消费者 **/
public class Consumer1 {
// 定义交换机名称
public static final String EXCHANGE_NAME = "wsm";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/** 生成一个临时的队列 队列的名称是随机的 当消费者断开和该队列的连接时 队列自动删除 */
String queueName = channel.queueDeclare().getQueue();
// 绑定: 把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串: Fanout模式 routingkey 没作用!
channel.queueBind(queueName, EXCHANGE_NAME, "");
// 发送回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("wsm 发布的最新消息:"+message);
};
// 消费者监听消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
因为:
String queueName = channel.queueDeclare().getQueue(); 临时队列
交换机类型:DIRECT
DIRECT 模式:
交换机/队列
时候,需要指定 routing key 一个队列,可以设置多个 routingkey
发送者发送消息, 会携带上 routingkey
routingkey
生产者 往交换机上发送消息,交换机只会将消息 向匹配的队列上发送消息, 消费者 监听队列消息消费
DIRECT 实例:
Conkey1
Conkey2
一个监听的队列 绑定交换机时指定 Conkey1
另一个绑定交换机时 绑定两个routingkey:Conkey1 Conkey2
Producer.Java
/** 消息生产者 **/
public class Producer {
// 交换机名
public static final String EXCHANGE_NAME = "DIRECT";
public static void main(String[] args) throws Exception {
// 创建连接对象,声明交换机 发送消息
Channel channel = MQChannelUtil.getChannel();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型, 可以是String直接写,也可以是 枚举类型; */
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/** 发送消息 **/
channel.basicPublish(EXCHANGE_NAME, "Conkey1", null, "Conkey1 发送的消息".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "Conkey2", null, "Conkey2 发送的消息".getBytes("UTF-8"));
}
}
Consumer1.Java
/** 消息消费者 **/
public class Consumer1 {
// 定义交换机名称
public static final String EXCHANGE_NAME = "DIRECT";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/** 生成一个临时的队列 队列的名称是随机的 当消费者断开和该队列的连接时 队列自动删除 */
String queueName = channel.queueDeclare().getQueue();
// 绑定: 把该临时队列绑定我们的 exchange
// 参数二 设置该队列和交换和绑定的 routingkey
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey1");
// 发送回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("最新的消息是:"+message);
};
// 消费者监听消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
比 消费者1
多添加:
// 可以设置多个key
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey2");
.queueBing("队列名","交换机","routingkey");
绑定 队列交换机 指定多个 routingkey
routingkey
交换机根据 key 将对应的消息发送到 队列上进行处理!
消费者2 可以同时接收到 Conkey1
和 Conkey2
发送的消息
交换机类型:TOPIC
该模式与Routingkey 非常类型,就相当于是一个 动态路由模式!!
routingkey
而,TOPIC 可以动态的进行 routingkey选择
使用上更加的个性化
#:匹配一个或多个词
举例:wsm.# 等于:wsm.1 / wsm.w.s.m / wsm.sm .后多个单词
*:匹配不多不少恰好1个词
举例:wsm.* 等于:wsm.sm / wsm.m .后一个单词
TOPIC 实例:
交换机类型
routingkey
,消费者 交换机类型
绑定交换机时候队列的 routingkey
Producer.Java
改变发送消息时候指定的 routingkey
还有交换机类型:TPOIC
//交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/** 发送消息 **/
channel.basicPublish(EXCHANGE_NAME, "Conkey.one", null, "Conkey.one 发送的消息".getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "Conkey.two.123", null, "Conkey.two.123 发送的消息".getBytes("UTF-8"));
Consumer1.Java
更改交换机类型 Topic
,接收消息,队列 交换机绑定时候,指定一下 routingkey通配符
// 参数二 设置该队列和交换和绑定的routingkey , Topic模式可以用过 通配符进行动态匹配: * 表示一个任意的单词;
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey.*");
// 参数二 设置该队列和交换和绑定的routingkey , Topic模式可以用过 通配符进行动态匹配: # 表示一个|多个任意的单词;
channel.queueBind(queueName, EXCHANGE_NAME, "Conkey.#");
交换机类型:HEAD
使用的很少,跟 TOPIC
动态路由类型,只不过它并不是通过 routingkey 进行消息与队列进行匹配
head头部参数 map类型
头部信息规则, 只有消息头部规则 和 队列的头部规则 匹配才能发送到对应的头部上!
不常用了解即可~
Producer.Java
/** 消息生产者 **/
public class Producer {
// 交换机名
public static final String EXCHANGE_NAME = "HEADERS";
public static void main(String[] args) throws Exception {
// 创建连接对象,声明交换机 发送消息
Channel channel = MQChannelUtil.getChannel();
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型, 可以是String直接写,也可以是 枚举类型; */
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
//发送的消息的消息头head map类型, 需要与队列的map 规范匹配才可以发送成功! 不然发送失败(交换机不知道往那个队列上发送;
HashMap<String ,Object> param = new HashMap<String, Object>();
param.put("id","1");
param.put("name","wsm");
//设置Map 匹配参数!
AMQP.BasicProperties.Builder builder=new AMQP.BasicProperties.Builder();
builder.headers(param);
/** 发送消息 **/
channel.basicPublish(EXCHANGE_NAME, "", builder.build(), "header的内容lalala~~".getBytes("UTF-8"));
}
}
Consumer1.Java
/** 消息消费者 **/
public class Consumer1 {
// 定义交换机名称
public static final String EXCHANGE_NAME = "HEADERS";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
/** 生成一个临时的队列 队列的名称是随机的 当消费者断开和该队列的连接时 队列自动删除 */
String queueName = channel.queueDeclare().getQueue();
//设置队列上的 map 参数,用于匹配请求时候的参数!
//特殊参数 x-match 值 all 或 any
//all 在发布消息时携带的map 必须和绑定在队列上的所有map 完全匹配
//any 只要在发布消息时携带的有一对键值map 满足队列定义的多个参数map的其中一个就能匹配上
//注意: 这里是键值对的完全匹配,只匹配到键了,值却不一样是不行的;
HashMap<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1"); // 可以尝试改变,map 信息,生产者消息还能发送到队列上面~
param.put("name","wsm");
// 绑定: 把该临时队列绑定我们的 exchange
// 队列绑定时需要指定参数,注意虽然不需要路由键但仍旧不能写成null,需要写成空字符串"", 参数四: map参数,规范!
channel.queueBind(queueName, EXCHANGE_NAME, "",param);
// 发送回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("最新的消息是:"+message);
System.out.println("Map传入参数数据:"+delivery.getProperties().getHeaders());
};
// 消费者监听消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
//消费者:队列规则all 所有匹配即可
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2");
param.put("name","wsm");
//不匹配
//生产者
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//匹配
//消费者:队列规则any 一个匹配即可
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","any");
param.put("id","1");
param.put("name","wsm");
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","1"); //匹配
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2"); //不匹配 key /value 都要匹配才可以
....
死信,顾名思义就是无法被消费的消息
正常情况下:
但
应用场景:
下单成功
,指定时间内订单未支付 下单失败
succeed 成功队列
等待用户确认订单,支付订单,消息被消费 下单成功
如果 30 分钟用户没有下单,则 succeed 成功队列消息
超时,为了确保消息不丢失,将消息发送到 死信交换机 —— defeated死信队列
消费者接收处理:下单失败!
死信队列:
死信队列产生:
订单超时支付 下单失败
订单被用户取消 下单失败
死信队列的实现:
生产者 交换机 普通队列
当消息出现意外, 普通队列上配置了 DXL交换机
,消息超时 超出队列... 直接发送到 DLX交换机———— DLX队列
Producer.Java
/** 消息生产者 **/
public class Producer {
// 普通交换机名
public static final String EXCHANGE_NAME = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间 10s秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(EXCHANGE_NAME, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}
Consumer1.Java
/** 消息消费者 **/
public class Consumer1 {
// 普通交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定:队列、交换机、路由键(routingKey)
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
// 正常队列绑定死信队列信息
HashMap<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
// 正常队列
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息" + message);
};
channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
});
}
}
先启动消费者,创建声明好队列之后,关闭,启动生产者发送消息…
Consumer2.Java
/** 消息消费者 **/
public class Consumer2 {
//死信交换机名称
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception {
Channel channel = MQChannelUtil.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收到消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
});
}
}
消息生产者代码去掉 TTL 属性
C1 消费者修改以下代码 (启动之后关闭该消费者 模拟其接收不到消息)
//设置正常队列的长度限制,例如发10个,4个则为死信 注意:此时需要把原先队列删除 因为参数改变了
params.put("x-max-length",6);
C2 消费者代码不变(启动 C2 消费者)
消息生产者代码同上 队列达到最大长度
生产者一致
C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息) 注释 params.put("x-max-length",6);
C2 消费者代码不变 ,启动消费者1关闭
然后再启动消费者 2
延迟队列,其实就是 死信队列
的一种,所有为了方便查看,使用SpringBoot 来进行搭建顺便了解学习一些SpringBoot 集成 RabbitMQ
① 创建SpringBoot 工程 启动类....
② 引入Maven依赖:pom.xml
<dependencies>
<!-- SpringBoot依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 阿里巴巴fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- lombok依赖 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
③ 编写配置文件:application.yml | properties
# SpringBoot 配置RabbitMQ ip 端口 用户 密码;
spring.rabbitmq.host=47.243.109.199
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上
延时队列中的元素是希望 在指定时间到了以后或之前取出和处理 简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列
延迟队列使用场景:
**这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务: **就几乎等于一个 死信队列
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒 RabbitMQ 有两种方式:
在创建队列的时候设置队列的“x-message-ttl”属性
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
//队列绑定交换机
QueueBuilder.durable(QUEUE_A).withArguments(args).build();
是针对每条消息设置TTL 生产者 生产消息时候设置:
// Spring方式
// 编辑参数
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 发送消息
channel.basicPublish("交换机", "队列", properties, "消息".getBytes());
// SpringBoot方式
// 通过 rabbitTemplate 发送消息...
rabbitTemplate.convertAndSend("交换机", "队列", "消息", correlationData -> {
correlationData.getMessageProperties().setExpiration("10000");
return correlationData;
});
代码实现:
两者队列 TTL 分别设置为 10S 和 40S
,消息超时会进入到 死信交换机 —— 发送到 QD死信队列
因为,项目采用SpringBoot 进行管理 原先配置队列信息,写在了生产者和消费者代码中,现在可写咋配置类中,生产者只发消息,消费者只接受消息
TtlQueueConfig.Java
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
// 普通交换机 普通队列
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
// 死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 死信队列
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明 死信队列交换机
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() { // 导包: org.springframework.amqp.core
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
controller 用户发送一个请求,服务将数据进行处理直接发送到MQ,由其它服务模块处理…
SendMsgController .Java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
// RabbitTemplate 对rabbitmq 的服务接口API 进行了封装;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
}
}
DeadLetterQueueConsumer .Java
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
* 消费者 - 死信队列
* @author wsm
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {
//@RabbitListener 负责监听具体那个队列...
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
@RabbitListener 声明在放上,处理要监听的队列,消息进行处理...
流量器请求 http://localhost:8080/ttl/sendMsg/拉拉拉
间隔 10s 40s 控制台输出:
2022-01-23 19:12:01.380 INFO 1508 --- [nio-8080-exec-3] c.example.controller.SendMsgController : 当前时间:Sun Jan 23 19:12:01 CST 2022,发送一条信息给两个 TTL 队列:嘻嘻嘻
2022-01-23 19:12:11.684 INFO 1508 --- [ntContainer#0-1] c.e.consumer.DeadLetterQueueConsumer : 当前时间:Sun Jan 23 19:12:11 CST 2022,收到死信队列信息消息来自 ttl 为 10S 的队列: 拉拉拉
2022-01-23 19:12:41.566 INFO 1508 --- [ntContainer#0-1] c.e.consumer.DeadLetterQueueConsumer : 当前时间:Sun Jan 23 19:12:41 CST 2022,收到死信队列信息消息来自 ttl 为 40S 的队列: 拉拉拉
存在问题bug
上面通过,SpringBoot 集成了 RabbitMQ 通过 队列设置TTL
消息设置TTL
即使只有一个队列,也可以设置不同消息的 延迟时间
修改上面业务,添加一个 QC
普通队列,不设置队列 延迟时间
… 每次发送消息给消息设置延迟时间...
MsgTtlQueueConfig.Java
@Configuration
public class MsgTtlQueueConfig {
// 死信交换机
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 普通的队列
public static final String QUEUE_C = "QC";
//声明队列 C 死信交换机
@Bean("queueC")
public Queue queueB() {
HashMap<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//没有声明 TTL 属性
return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
SendMsgController.Java
/**
* 延时队列优化
* @param message 消息
* @param ttlTime 延时的毫秒
*/
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
// 发送消息的时候,设置消息的延迟时间...
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
}
不需要任何更改,只需要等待接收消息即可…
存在bug
浏览器请求:
http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好1/4000
插件解决bug
rabbitmq_delayed_message_exchange
# 工具引入,插件安装包;
[root@iZj6ciuzx7luldnazt4iswZ wsm]# ls
erlang-21.3-1.el7.x86_64.rpm rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq-server-3.8.8-1.el7.noarch.rpm
# 将插件移动到 RabbitMQ的plugins 包下: /usr/lib 是linux 默认安装服务路径...
[root@iZj6ciuzx7luldnazt4iswZ wsm]# cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
# rabbitmq 开始安装启动插件补丁...
[root@iZj6ciuzx7luldnazt4iswZ wsm]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@iZj6ciuzx7luldnazt4iswZ:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@iZj6ciuzx7luldnazt4iswZ...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
# 重启服务:
[root@iZj6ciuzx7luldnazt4iswZ wsm]# systemctl restart rabbitmq-server
安装成功,查看页面中发现,交换机多了一种信的类型:x-delayed-message
测试实现:
新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下
生产者发送消息指定消息 延迟,到交换机上 到达固定的时间才会发送到交换机上...
来实现消息的延迟TTL在我们自定义的交换机中,这是一种新的交换类型
DelayedQueueConfig.Java
@Configuration
public class DelayedQueueConfig {
// 队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
// 交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
// key
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 声明队列
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
// 自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
HashMap<String, Object> args = new HashMap<>();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 绑定 队列和交换机;
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
SendMsgController.Java
// 交换机 和 routingkey
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
// 请求接口:
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
}
DelayQueueConsumer.Java
/**
* 消费者 - 基于插件的延时队列
*
* @author wsm
*/
@Slf4j
@ComponentScan
public class DelayQueueConsumer {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
}
浏览器请求:
http://localhost:8080/ttl/sendDelayMsg/wsm/20000
http://localhost:8080/ttl/sendDelayMsg/www/4000
在生产环境中由于一些不明原因,导致 RabbitMQ 重启
确认机制方案:
Map kv结构:k每个消息唯一的标识 v每个消息体
ack
生产者根据对于的k 删除缓存数据
交换机超时|宕机,没有收到消息,生产者 回调 nack
生产者,重新发送消息,或其他处理
开启发布确认模式
交换机 direct类型
队列
绑定
信息
为了方便测试,消息没有发送到队列上,消息丢失的场景,使用 direct类型:发送消息指定 routingkey 只会发送到相同的 队列上
,没有匹配的队列 消息丢失
ack
, 长时间没有收到也会回调触发 nack
)
但,注意 交换机将消息发送到对应队列上,如果,消息没有匹配的队列,所以消息还是会丢失(没有匹配的队列,发送;
注意:首先要开启Rabbit MQ的发布确认模式:
# 开启RabbitMQ 发布确认模式:
spring.rabbitmq.publisher-confirm-type=correlated
# NONE 值是禁用发布确认模式,是默认值
# CORRELATED 值是发布消息成功到交换器后会触发回调方法
# SIMPLE 值经测试有两种效果
# 其一效果和 CORRELATED 值一样会触发回调方法
# 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果
# 根据返回结果来判定下一步的逻辑,注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel 则接下来无法发送消息到 broker
声明定义:交换机 队列 并进行绑定
ConfirmConfig.Java
/** SpringBoot 消息确认模式 **/
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
//声明业务 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
交换机confirm.exchange
队列 confirm.queue
队列与交换机绑定 routingkey: key1
com.example.producercallack包下: MyCallBack.Java
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
/**
* 交换机不管是否收到消息的一个回调方法
*
* @param correlationData 消息相关数据
* @param ack 交换机是否收到消息, true(ack) false(nack)
* @param cause 为收到消息的原因: 异常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
}
}
}
ProducerController.Java
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
@Autowired // rabbitmq 模板对象;
private RabbitTemplate rabbitTemplate;
@Autowired // 发布确认消息,消息回调方法类;
private MyCallBack myCallBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(myCallBack);
}
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
/**
* 消息回调和退回
* @param message
*/
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
//指定消息 id 为 1
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
log.info(routingKey + "发送消息内容:{}", message + routingKey);
CorrelationData correlationData2 = new CorrelationData("2");
routingKey = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
log.info(routingKey + "发送消息内容:{}", message + routingKey);
}
}
ConfirmConsumer.Java
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
log.info("消费方法接受到队列 confirm.queue 消息:{}", msg);
}
}
浏览器请求: http://localhost:8080/confirm/sendMessage/你好
回调方法接收
应该是 消费方法接收
对于上面的操作: 如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的
最起码通知我一声,我好自己处理啊
通过设置 mandatory
参数可以在当消息传递过程中不可达目的地时将消息返回给生产者:
#消息退回
spring.rabbitmq.publisher-returns=true
com.example.producercallack包下: MyCallBack.Java
RabbitTemplate.ReturnsCallback接口
低版本可能没有 RabbitTemplate.ReturnsCallback
请用 RabbitTemplate.ReturnCallback
returnedMessage(ReturnedMessage returned)
当消息无法路由的时候的回调方法
//当消息无法路由的时候的回调方法
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",
new String(returned.getMessage().getBody()), returned.getExchange(),
returned.getReplyText(), returned.getRoutingKey(),
returned.getReplyCode());
}
低版本:消息无法路由回调方法()
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.info("消息:{}被服务器退回,退回原因:{}, 交换机是:{}, 路由 key:{}",new String(message.getBody()),replyText, exchange, routingKey);
}
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(myCallBack);
/**
* true:交换机无法将消息进行路由时,会将该消息返回给生产者
* false:如果发现消息无法进行路由,则直接丢弃
*/
rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
rabbitTemplate.setReturnsCallback(myCallBack);
}
http://localhost:8080/confirm/sendMessage/你好
ok, 消息成功回退,剩下的处理代码可以自定义了…
因为消息 ack 持久化机制存在一定的缺点
幂等性
操作
实现原理:
redis
的 setnx 也就是只有不存在key的时候才设置
每个消息具有一个唯一的标识, 消费者第一次消费成功的时候,使用 setnx
设置,这样无论后面多少次操作,都不在进行操作了!
使用场景:
订单催付的场景
我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧
但是,tmall 商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润 所以理应当然,他们的订单必须得到优先处理
如何实现:
队列中代码添加优先级
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
消息中代码添加优先级
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
注意事项:
要让队列实现优先级需要做的事情有如下事情
public class PriorityProducer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//给消息赋予一个 priority 属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
if (i == 5) {
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
} else {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}
}
public class PriorityConsumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
使用场景
持久化
在被写入磁盘的同时也会在内存中驻留一份备份
当RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息.
两种模式
队列具备两种模式:default默认
和 lazy
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
内存开销对比
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅 占用 1.5MB
channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_test' in vhost '/': received 'false' but current is 'true', class-id=50, method-id=10)
有的时候对于,已经声明的队列,更改了其配置,需要在RabbitMQ管理页面手动删除MQ的队列,才能进行重新声明,不然会报错…
终于写完了…
需要代码,安装工具的兄弟可以下方下载: 点个👍吧!
链接:https://pan.baidu.com/s/1M0m0xKBtZlAs3v3FYKq6Tw 提取码:2540
MQ