概述
MQ消息丢失的可能存在于方方面面,比如网络问题、MQ挂掉、服务器断电,都会导致消息丢失,那我们如何保障消息的可靠传输就成了很重要的问题。如果是你的话你会怎么回答这个问题呢?
在之前的工作模式中,我们会发现,所有的生产者在推送完消息后就结束或者执行其他任务,并不知晓消息是否发送成功。如果要保证消息的可靠性,需要对消息进行持久化处理。除了设置持久化相关代码外,我们还要保证消息是被推送到MQ中。正常情况下,如果消息经过交换机进入队列就可以完成消息的持久化,但如果消息在没有到达broker之前出现意外,那就造成消息丢失。
因此接下来会通过生产者丢失消息、如何解决生产者丢失消息、MQ丢失消息、如何解决MQ丢失消息、消费者丢失消息、如何解决消费者丢失消息等方面来具体阐述如何保障消息的可靠性。
关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!
生产者丢失消息
生产者Producer在发布消息给交换器Exchange的时候,突然发生网络故障,导致消息没有到交换器链接就中断,在传输的半路就搞丢了。
解决办法:
因此我们有两种方式可以防止这种情况: 第一种:事务模式
channel.txSelect();//声明事务模式
channel.txCommit();//提交事务
chnnel.txRollback();//回滚事务
实现原理:
生产者发送消息时,会检查MQ是否接收到消息,(这里MQ是指交换机)收到消息就执行channel.txCommit() 提交事务,没有被MQ接收到会报错,这时候就可以通过回滚事务channel.txRollback()。然后重试发送消息,如果收到了消息,那么可以提交事务channel.txCommit()。但是这样吞吐量会降下来,因为太耗性能。
代码举例:
生产者Producer
//生产者Producer
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import cn.linkpower.util.MqConnectUtil;
public class Send {
private static final String queue_name = "test_work_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1、建立连接
Connection mqConnection = RabbitMQUtil.getMqConnection();
//2、建立信道(通道)
Channel channel = mqConnection.createChannel();
//3、声明队列 非持久化
channel.queueDeclare(queue_name, false, false, false, null);
//公平分发---
//为了开启公平分发操作,在消息消费者发送确认收到的指示后,消息队列才会给这个消费者继续发送下一条消息。
//此处的 1 表示 限制发送给每个消费者每次最大的消息数。
channel.basicQos(1);
try {
//开启事务
channel.txSelect();
String string = "hello 可为 ";
System.out.println("send msg = "+string);
//发送消息
channel.basicPublish("", queue_name, null, string.getBytes());
关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!
//模拟异常操作
//int a = 10/0;
//无异常 正常执行,则提交事务操作
channel.txCommit();
} catch (Exception e) {
System.out.println("出现异常进行回滚操作");
channel.txRollback();
}
//5、使用完毕后,需要及时的关闭流应用
channel.close();
mqConnection.close();
}
}
消费者Consumer
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import cn.linkpower.util.MqConnectUtil;
/**
* 消息消费者要实现 “公平分发” 的操作,需要关闭自动应答操作;<br>
* 同时,在处理完消息后,需要向消息队列做“消费完成”的应答!<br>
* 其实这里都可以自动实现,只要有手动确认模式就走手动,后面会讲。
*/
public class GetMsg1 {
private final static String queue_name="test_work_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立连接
Connection mqConnection = MqConnectUtil.getMqConnection();
//2、获取信道
final Channel channel = mqConnection.createChannel();
//3、声明队列
channel.queueDeclare(queue_name, false, false, false, null);
关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!
//公平分发---每次只分发一个消息
channel.basicQos(1);
//4、信访室接受消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" get msg new1 = " + message );
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("get msg new1 done");
//公平分发--- 消费完成后,需要做相关的回执信息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!
//5、开启自动应答,默认开启
//channel.basicConsume(queue_name,true, consumer);
//公平分发--- 同时需要关闭自动“应答”,
//RabbitMQ消费者手动应答后不需要再关闭自动应答,因为它们已经是两种不同的模式。
channel.basicConsume(queue_name, false,consumer);
}
}
分别进行正常测试和异常测试,发现当出现异常时,会进行回滚操作,此时的消息队列并不能接收到任何的消息。
缺点弊端
如果消息在接收到之前,消费者那边出现连接或者信道关闭,那么消息就丢失了;另一方面,这种模式消费者那边可以传递过载的消息,这里我们对消息进行了每次限制一条,如果没有对传递的消息数量进行限制,有可能使得消费者由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,消费者线程被操作系统杀死。
通过设置生产者Channel为comfirm模式,该Channel上发布的所有消息都会被指派一个唯一ID(每次从1开始累加),当消息到达生产者指定的消息队列后,broker会返回一个确认给生产者(包含之前的ID),这样生产者就能知道哪条消息成功发送了。
实现原理:
所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
代码实战:
第一步、配置文件中配置消息发送确认
spring.rabbitmq.publisher-confirms = true
# 这里注意
# publisher-confirm-type: correlated
# 2.1之前采用publisher-confirms: true 之后采用publisher-confirm-type: correlated
# none表示不启用消息确认机制;
# correlated表示使用相关模式进行消息确认,即每个消息发送后都会收到一个确认消息;
# simple表示使用简单模式进行消息确认,即批量发送消息后收到一个确认消息。
关注公众号【可为编程】回复【加群】进入面试技术交流群!!!
第二步、生产者实现RabbitTemplate.ConfirmCallback接口,重写方法
confirm(CorrelationData correlationData, boolean isSendSuccess, String error)
// 当然也可以通过注入的方式自定义confirm listener 看后面手动创建
// 这里是SpringBoot采用自动注入RabbitMQ的形式配置
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
.........
}
}
关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!!
//手动创建RabbitMQ对象并设置自定义confirm listener
Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();//开启confirm模式通道
createProductConfirm(channel);
//MQ生产者Confirm模式
public void createProductConfirm(Channel channel){
// 创建一个有序的集合 保存每一次的tag
final SortedSet<Long> confirmSortedSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
// 设置监听事件 异步监听每条消息的成功与失败
channel.addConfirmListener(new ConfirmListener() {
//成功时回调(异步的 此时表示没有问题的回调)
//每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if(multiple){
//当发送多条消息时,他返回可能多条消息的接受情况,也可能返回单条消息的情况
System.out.println("handleAck --- multiple == true");
confirmSortedSet.headSet(deliveryTag+1).clear();
}else{
//单条消息
System.out.println("handleAck --- multiple == false");
confirmSortedSet.remove(deliveryTag);
}
}
//失败时回调
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if(multiple){
System.out.println("handleNack --- multiple == true");
confirmSortedSet.headSet(deliveryTag+1).clear();
}else{
//单条消息
System.out.println("handleNack --- multiple == false");
confirmSortedSet.remove(deliveryTag);
}
}
});
}
1、生产者将信道(channel)设置为confirm模式。
2、所有在该信道上发布的消息,都会指派一个起始为1且唯一的id,一旦消息被推送至匹配的队列之后,broker就会发送一个(携带id的)确认给生产者,使得生产者知道消息成功到达了目标队列中。
注意∶消息(message)被发布者(publisher)发送给交换机(exchange),然后交换机将收到的消息根据路由规则routingkey分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者consumer,或者消费者按照需求自行获取。
3、如果消息和队列是可持久化(durable = true)的,那么确认消息会将消息写至磁盘后发出。
4、broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示应答这个序列号之前的所有消息,使其假装都已经得到了处理。批量应答会产生消息丢失的情况,所以要保障消息不丢失,应该用非批量应答multiple=false。后面讲消费者时也会讲到。
关注公众号【可为编程】回复【加群】进入面试技术交流群!!!
事务机制和 confirm机制最大的不同:
事务机制是同步的,你提交一个事务之后会阻塞在那儿,加了事务就不能再加confirm机制。
confirm机制是异步的,你送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用confirm机制的。
MQ中丢失消息
RabbitMQ 自己弄丢了数据,这个必须要开启 RabbitMQ消息的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。
解决办法:
设置持久化有两个关注点:
第一个是创建 queue 的时候将其设置为持久化
第二个是发送消息的时候将消息设置为持久化
第一种:元配置持久化
在创建交换机和队列的时候,将其设置为持久化,这样就算重启RabbitMQ或者突然断电,元数据信息也会从磁盘中进行读取,这样就可以保证RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。
代码实战:
// 1. 创建持久化交换器 如果不存在自动创建
channel.exchangeDeclare(rabbitConfigDTO.getExchange(), BuiltinExchangeType.TOPIC, true);
// 2. 创建持久化队列 如果不存在自动创建
channel.queueDeclare(rabbitConfigDTO.getQueue(), true, false, false, null);
channel.queueBind(rabbitConfigDTO.getQueue(), rabbitConfigDTO.getExchange(), rabbitConfigDTO.getRoutingkey());
第二种:消息持久化
发送消息的时候将消息的deliveryMode设置为2,或者是
MessageProperties.PERSISTENT_TEXT_PLAIN 就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。
关注公众号【可为编程】回复【加群】进入面试技术交流群!!!
代码实战
下面是采用springboot整合的rabbitTemplate来实现消息发送
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置为持久化模式
Message message = new Message(payload, props);
rabbitTemplate.send(exchange, routingKey, message);
// 或者直接传递Message对象,其中包含已设置持久化的MessageProperties
rabbitTemplate.convertAndSend(exchange, routingKey, new Message(messageBody, props));
// 或者使用MessagePostProcessor接口动态设置消息属性
rabbitTemplate.convertAndSend(
exchange,
routingKey,
messageBody,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
}
);
下面是采用SpringBoot中通过new来创建连接来实现消息发送
Connection connection = new RabbitMQUtils(rabbitConnectionUrlDTO.getIp(), rabbitConnectionUrlDTO.getPort(), rabbitConnectionUrlDTO.getVirtualhost(), rabbitConnectionUrlDTO.getUsername(), rabbitConnectionUrlDTO.getPassword()).getConnection();
if (Objects.nonNull(connection)) {
log.warn("共" + RABBIT_CONNECTION_URL_LIST.size() + "个连接!");
Channel channel = connection.createChannel();
//消息持久化到磁盘,避免因为突然断电或重启导致消息丢失
channel.basicPublish(rabbitConnectionUrlDTO.getExchange(), rabbitConnectionUrlDTO.getRoutingkey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSONObject.toJSONString(map).getBytes(StandardCharsets.UTF_8));
log.warn("连接RabbitMQ成功!" + "IP地址为" + rabbitConnectionUrlDTO.getIp() + "端口号为" + rabbitConnectionUrlDTO.getPort() + "交换器为" + rabbitConnectionUrlDTO.getExchange() + "队列名为" + rabbitConnectionUrlDTO.getQueue() + "路由键为" + rabbitConnectionUrlDTO.getRoutingkey());
}
关注公众号【可为编程】回复【加群】进入面试技术交流群!!!
消息持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。注意,哪怕是你给RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
消费端丢失消息
消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了, RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的ack机制,简单来说,就是你关闭 RabbitMQ 的自动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。如果消费者消费完没有设置basicAck,没有确认消息就会一直存在RabbitMQ当中。所以久而久之就会造成消息堆积,造成消息重复消费和阻塞RabbitMQ。
解决办法:
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。确认模式主要分为下面三种:
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
注意:在springboot项目中通过在配置文件中指定消息确认的模式,如下指定手动确认模式:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
如果成功消费消息,一般调用下面的代码用于确认消息成功处理完成
@Component
@RabbitListener(queues = {RabbitConfig.QUEUE_A, RabbitConfig.QUEUE_B})
public class MsgReceiver {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
try {
logger.info("【Consumer02成功接收到消息】>>> {}", msg);
// 确认收到消息,只确认当前消费者的一个消息收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
logger.info("【Consumer02】消息已经回滚过,拒绝接收消息 :{}", msg);
// 拒绝消息,并且不再重新进入队列
//public void basicReject(long deliveryTag, boolean requeue)
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
logger.info("【Consumer02】消息即将返回队列重新处理 :{}", msg);
//设置消息重新回到队列处理
// requeue表示是否重新回到队列,true重新入队
//public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
e.printStackTrace();
}
}
}
关注公众号【可为编程】回复【面试】领取年度最新面试题大全!!
总结
结合代码实战和原理解读了RabbitMQ消息丢失以及对应的解决方案,正如上面图示所言,每一个角色都要兼顾才能够保障消息彻底的不丢失,这些只是MQ为我们提供的一些机制,在日常开发和维护中,网络问题,服务器问题,消息并发量问题处理不好都会导致消息丢失,因此我们要结合实际情况,尽量在编码的时候做万无一失。
END