作为消息中间件,都会面临消息丢失的问题,消息丢失大概分为三种情况:
broker
发送消息Broker
,但是 Broker
没有把消息保存好,导致消息丢失Broker
发送消息到消费者,消费者在消费消息时,因为没有处理好,导致 broker
将消费失败的消息从列表中删除了RabbitMQ
也对上述问题给出了相应的解决方案。 发布确认属于 RabbitMQ
的七大工作模式之一
生产者将信道设置成 confirm
(确认)模式
confirm
模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始) channel
下,序号不可能重复brocker
就会发送一个确认(ACK
)给生产者(包含消息的唯一 ID
) broker
回传给生产者的确认消息中 deliveryTag
包含了确认消息的序号broker
也可以设置 channel.basicAck
方法中的 multiple
参数,表示到这个序号之前的所有消息都已经得到了处理
发送确认机制最大的好处在于它是异步的,生产者可以同时发布消息和等待信道返回确认消息
RabbitMQ
因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)
命令,生产者同样可以在回调方法中处理该 nack
命令使用发布确认机制,必须要信道设置成 confirm
(确认) 模式
AMQP 0.9.1
协议的扩展,默认情况下它不会被启用channel.confirmSelect()
将信道设置为 confirm
模式Channel channel = connection.createChannel();
channel.confirmSelect();
发布确认有三种策略,接下来我们来介绍这三种策略
Producer
、Brocker
、Consumer
都有可能丢失消息
Producer
消息丢失的问题因为每一个策略都需要重复建立链接这一步骤,所以我们将其提出来,单独作为一个方法,需要的时候直接调用即可
main
方法外面,使用一个静态方法public class PublisherConfirms {
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
public static void main(String[] args) {
}
}
Publishing Messages Individually
/**
* 单独确认
*/
private static void publishingMessagesIndividually() throws Exception {
// 1. 创建连接
// 我们将连接的建立写在 try 里面,这样就不用再去关闭了
try(Connection connection = createConnection()) {
// 2. 开启信道
Channel channel = connection.createChannel();
// 3. 设置信道为 confirms 模式
channel.confirmSelect();
// 4. 声明队列(交换机就使用内置的,就不再声明了)
// 队列对象、是否持久化、是否独占、是否自动删除、传递参数
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false,null);
// 5. 发送消息,并等待确认
// 这里我们需要可以再创建一个 MESSAGE_COUNT 全局变量,来指定消息的数量
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
// 信道的发送
// 交换机的名称(我们使用的是内置交换机,也就是空的)、routingKey、
channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
// 等待确认(等待确认消息,只要消息被确认,这个方法就会被返回)
// 有 waitForConfirms() 和 waitForConfirmsOrDie() 随便用哪个
// 如果超时过期,则抛出 TimeoutException。如果任何消息被 nack(丢失),waitForConfirmsOrDie 则抛出 Exception
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
// 这里注意是 printf System.out.printf("单独确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
}
单独确认==>消息条数: 200, 耗时: 5793 ms
观察上面代码,会发现这种策略是每发送一条消息后就调用 channel.waitForConfirmsOrDie()
方法, 之后等待服务器的确认
Linux
内核中的 fsync
方法)但是消息确认机制是支持异步的,可以一边发送消息,一边等待消息确认。由此进行了改进,我们看另外两种策略
Publishing Messages in Batches(批量确认)
:每发送一批消息之后,调用 channel.waitForConfirms
方法,等待服务器的确认返回Handling Publisher Confirms Asynchronously(异步确认)
:提供一个回调方法,服务端确认了一条或者多条消息后,客户端会对这个方法进行处理Publishing Messages in Batches
/**
* 批量确认
*/
private static void publishingMessagesInBatches() throws Exception {
// 1. 建立连接
try(Connection connection = createConnection()){
// 2. 开启信道
Channel channel = connection.createChannel();
// 3. 设置信道为 confirm 模式
channel.confirmSelect();
// 4. 声明队列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
// 5. 发送消息,并进行确认
// 设置批量处理的大小和计数器
long start = System.currentTimeMillis();
int batchSize = 100;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
outstandingMessageCount++;
// 当计数器的大小达到了设置的批量处理大小,就进行确认
if(outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5000);
// 消息确认后,计数器要清零
outstandingMessageCount = 0;
}
// 当计数器大小 < 100 的时候,由于没有达到批量发送的标准,所以单独再进行发送
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5000);
}
}
long end = System.currentTimeMillis();
System.out.printf("批量确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);
}
}
批量确认==>消息条数: 200, 耗时: 128 ms
Handling Publisher Confirms Asynchronously
异步 confirm
方法的编程实现最为复杂
Channel
接口提供了一个方法 addConfirmListener()
ConfirmListener
回调接口ConfirmListener
接口中包含两个方法:
handleAck(long deliveryTag, boolean multiple)
,处理 RabbitMQ
发送给生产者的 ack
deliveryTag
表示发送消息的序号multiple
表示是否批量确认handleNack(long deliveryTag, boolean multiple)
,处理 RabbitMQ
发送给生产者的 nack
我们需要为每一个 Channel
维护一个已发送消息的序号集合
RabbitMQ
的 confirm
回调时,从集合中删除对应的消息Channel
开启 confirm
模式后,channel
上发送消息都会附带一个从 1
开始递增的 deliveryTag
序号SortedSet
的有序性来维护这个已发消息的集合 ack
时,从序列中删除该消息的序号。如果为批量确认消息,表示小于当前序号 deliveryTag
的消息都收到了,则清楚对应集合nack
时,处理逻辑类似,不过需要结合具体业务情况,进行消息重发等操作/**
* 异步确认
*/
private static void handlingPublisherConfirmsAsynchronously() throws Exception {
// 1. 建立连接
try(Connection connection = createConnection()) {
// 2. 开启信道
Channel channel = connection.createChannel();
// 3. 设置信道为 confirm 模式
channel.confirmSelect();
// 4. 声明队列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
// 5. 监听 confirm long start = System.currentTimeMillis();
// 创建一个集合,用来存放未确认的消息(的id)
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 如果是批量确认,就要将集合中 <= deliveryTag 的 id 都给清除掉
if(multiple) {
// headSet(n)方法返回当前集合中小于 n 的集合
// 先获取到这部分 id,然后一起 clear 清除掉即可
confirmSeqNo.headSet(deliveryTag + 1).clear();
}else {
// 单独确认,只需要移除当前这个 id 即可
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 和 ack 处理模式基本是相似的,只是多了一步重发处理
if(multiple) {
confirmSeqNo.headSet(deliveryTag + 1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
// 业务需要根据实际场景进行处理,比如重发,此处代码省略
}
});
// 6. 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
long seqNo = channel.getNextPublishSeqNo(); // 拿到消息的序号
channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
confirmSeqNo.add(seqNo); // 将消息的序号加入集合中
}
// 确认消息已处理完
while (!confirmSeqNo.isEmpty()) {
// 没有处理完,就休眠一段时间后再确认一下,看是否处理完
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.printf("异步确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);
}
}
单独确认==>消息条数: 200, 耗时: 93 ms
package rabbitmq.publisher.confirms;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class PublisherConfirms {
private static final Integer MESSAGE_COUNT = 200;
static Connection createConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(Constants.HOST);
connectionFactory.setPort(Constants.PORT);
connectionFactory.setUsername(Constants.USER_NAME);
connectionFactory.setPassword(Constants.PASSWORD);
connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
return connectionFactory.newConnection();
}
public static void main(String[] args) throws Exception {
// Strategy #1: Publishing Messages Individually
// 单独确认
publishingMessagesIndividually();
// Strategy #2: Publishing Messages in Batches
// 批量确认
publishingMessagesInBatches();
// Strategy #3: Handling Publisher Confirms Asynchronously
// 异步确认
handlingPublisherConfirmsAsynchronously();
}
/**
* 单独确认
*/
private static void publishingMessagesIndividually() throws Exception {
// 1. 创建连接
// 我们将连接的建立写在 try 里面,这样就不用再去关闭了
try(Connection connection = createConnection()) {
// 2. 开启信道
Channel channel = connection.createChannel();
// 3. 设置信道为 confirms 模式
channel.confirmSelect();
// 4. 声明队列(交换机就使用内置的,就不再声明了)
// 队列对象、是否持久化、是否独占、是否自动删除、传递参数
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false,null);
// 5. 发送消息,并等待确认
// 这里我们需要可以再创建一个 MESSAGE_COUNT 全局变量,来指定消息的数量
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
// 信道的发送
// 交换机的名称(我们使用的是内置交换机,也就是空的)、routingKey、
channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
// 等待确认(等待确认消息,只要消息被确认,这个方法就会被返回)
// 有 waitForConfirms() 和 waitForConfirmsOrDie() 随便用哪个
// 如果超时过期,则抛出 TimeoutException。如果任何消息被 nack(丢失),waitForConfirmsOrDie 则抛出 Exception channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
// 这里注意是 printf System.out.printf("单独确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
}
/**
* 批量确认
*/
private static void publishingMessagesInBatches() throws Exception {
// 1. 建立连接
try(Connection connection = createConnection()){
// 2. 开启信道
Channel channel = connection.createChannel();
// 3. 设置信道为 confirm 模式
channel.confirmSelect();
// 4. 声明队列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
// 5. 发送消息,并进行确认
// 设置批量处理的大小和计数器
long start = System.currentTimeMillis();
int batchSize = 100;
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
outstandingMessageCount++;
// 当计数器的大小达到了设置的批量处理大小,就进行确认
if(outstandingMessageCount == batchSize) {
channel.waitForConfirmsOrDie(5000);
// 消息确认后,计数器要清零
outstandingMessageCount = 0;
}
// 当计数器大小 < 100 的时候,由于没有达到批量发送的标准,所以单独再进行发送
if (outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5000);
}
}
long end = System.currentTimeMillis();
System.out.printf("批量确认==>消息条数: %d, 耗时: %d ms \n", MESSAGE_COUNT, end-start);
}
}
/**
* 异步确认
*/
private static void handlingPublisherConfirmsAsynchronously() throws Exception {
// 1. 建立连接
try(Connection connection = createConnection()) {
// 2. 开启信道
Channel channel = connection.createChannel();
// 3. 设置信道为 confirm 模式
channel.confirmSelect();
// 4. 声明队列
channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
// 5. 监听 confirm long start = System.currentTimeMillis();
// 创建一个集合,用来存放未确认的消息(的id)
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 如果是批量确认,就要将集合中 <= deliveryTag 的 id 都给清除掉
if(multiple) {
// headSet(n)方法返回当前集合中小于 n 的集合
// 先获取到这部分 id,然后一起 clear 清除掉即可
confirmSeqNo.headSet(deliveryTag + 1).clear();
}else {
// 单独确认,只需要移除当前这个 id 即可
confirmSeqNo.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 和 ack 处理模式基本是相似的,只是多了一步重发处理
if(multiple) {
confirmSeqNo.headSet(deliveryTag + 1).clear();
}else {
confirmSeqNo.remove(deliveryTag);
}
// 业务需要根据实际场景进行处理,比如重发,此处代码省略
}
});
// 6. 发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String msg = "hello publisher confirms" + i;
long seqNo = channel.getNextPublishSeqNo(); // 拿到消息的序号
channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
confirmSeqNo.add(seqNo); // 将消息的序号加入集合中
}
// 确认消息已处理完
while (!confirmSeqNo.isEmpty()) {
// 没有处理完,就休眠一段时间后再确认一下,看是否处理完
Thread.sleep(10);
}
long end = System.currentTimeMillis();
System.out.printf("异步确认==>消息条数: %d, 耗时: %d ms", MESSAGE_COUNT, end-start);
}
}
}
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有