先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 **应用场景:**为了保证订单业务的消息数据不丢失,需要使用到
RabbitMQ
的死信队列机制,当消息消费发生异常时,将消息投入死信队列中;还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
流程说明:
实现说明:
public class Consumer01 {
/**
* 正常交换机
*/
public static final String NORMAL_EXCHANGE = "normal_exchange";
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE = "dead_exchange";
//正常队列
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
//声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//设置过期时间
// params.put("x-message-ttl", 10000);
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
//声明队列 不共享 不持久化 不删除
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, deliver) ->{
//获取消息
String message = new String(deliver.getBody(), "UTF-8");
System.out.println("Consumer01接收到的消息为:" + message);
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback -> {});
}
}
public class Consumer02 {
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE = "dead_exchange";
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列 不共享 不持久化 不删除
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定交换机和队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, deliver) ->{
//获取消息
String message = new String(deliver.getBody(), "UTF-8");
System.out.println("Consumer02接收到的消息为:" + message);
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback -> {});
}
}
public class Product {
/**
* 正常交换机
*/
public static final String NORMAL_EXCHANGE = "normal_exchange";
//正常队列
public static final String NORMAL_QUEUE = "normal_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
//设置消息的 TTL 时间
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String msg = "消息" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, msg.getBytes(StandardCharsets.UTF_8));
}
}
}
生产者设置具有灵活性,第一条消息可设置10s,第二条设置8s
//设置消息的 TTL 时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
消费者设置,所有消息的ttl都是一样的,没有灵活性
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//设置过期时间
params.put("x-message-ttl", 10000);
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
消息达到设置的最大长度后,多与的消息会进入死信队列
public class Product {
/**
* 正常交换机
*/
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
for (int i = 1; i < 11; i++) {
String msg = "消息" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, msg.getBytes(StandardCharsets.UTF_8));
}
}
}
public class Consumer01 {
/**
* 正常交换机
*/
public static final String NORMAL_EXCHANGE = "normal_exchange";
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE = "dead_exchange";
//正常队列
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
//声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//设置过期时间
// params.put("x-message-ttl", 10000);
//设置队列长度的限制
params.put("x-max-length", 6);
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
//声明队列 不共享 不持久化 不删除
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, deliver) ->{
//获取消息
String message = new String(deliver.getBody(), "UTF-8");
System.out.println("Consumer01接收到的消息为:" + message);
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback -> {});
}
}
public class Consumer02 {
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE = "dead_exchange";
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列 不共享 不持久化 不删除
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定交换机和队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, deliver) ->{
//获取消息
String message = new String(deliver.getBody(), "UTF-8");
System.out.println("Consumer02接收到的消息为:" + message);
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback -> {});
}
}
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//设置队列长度的限制
params.put("x-max-length", 6);
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
public class Product {
/**
* 正常交换机
*/
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
for (int i = 1; i < 11; i++) {
String msg = "消息" + i;
channel.basicPublish(NORMAL_EXCHANGE, "normal", properties, msg.getBytes(StandardCharsets.UTF_8));
}
}
}
public class Consumer01 {
/**
* 正常交换机
*/
public static final String NORMAL_EXCHANGE = "normal_exchange";
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE = "dead_exchange";
//正常队列
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
//声明交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
//正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
//声明队列 不共享 不持久化 不删除
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定交换机和队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, deliver) ->{
//获取消息
String message = new String(deliver.getBody(), "UTF-8");
if ("消息6".equals(message)){
System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(deliver.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到消息"+message);
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
}
};
//
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback -> {});
}
}
public class Consumer02 {
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE = "dead_exchange";
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQConfig.getChannel();
//声明交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明队列 不共享 不持久化 不删除
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定交换机和队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead");
System.out.println("等待接收消息......");
DeliverCallback deliverCallback = (consumerTag, deliver) ->{
//获取消息
String message = new String(deliver.getBody(), "UTF-8");
System.out.println("Consumer02接收到的消息为:" + message);
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback -> {});
}
}
//获取消息
String message = new String(deliver.getBody(), "UTF-8");
if ("消息6".equals(message)){
System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(deliver.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到消息"+message);
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
}
//消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback -> {});
如果觉得内容不错的话,希望大家可以帮忙点赞转发一波,这是对我最大的鼓励,感谢🙏🏻
END
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有