TTL
(Time to Live
,过期时间),即过期时间。RabbitMQ
可以对消息和队列设置 TTL
当消息到达存活时间之后,还没有被消费,就会被自动清除
目前有两种方法可以设置消息的 TTL
TTL
,队列中所有的消息都有相同的过期时间TTL
可以不同
如果两种方法一起使用,则消息的 TTL
以两者之间较小的那个数值为准TTL
的方法是在发送消息的方法中加入 expiration
的属性参数,单位为毫秒// 1. 交换机
@Bean("ttlExchange")
public FanoutExchange ttlExchange() {
return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();
}
// 2. 队列
@Bean("ttlQueue")
public Queue ttlQueue() {
return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
// 3. 队列和交换机绑定 Binding@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
@RequestMapping("/ttl")
public String ttl() {
String ttlTime = "10000"; // 10s
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...",
messagePostProcess -> {
messagePostProcess.getMessageProperties().setExpiration(ttlTime);
return messagePostProcess;
});
return "发送成功";
}
调用接口,发送消息,观察结果: http://127.0.0.1:8080/producer/ttl
Read
消息为 1
如果不设置 TTL
,则表示此消息不会过期;如果将 TTL
设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃
设置队列 TTL
的方法是在创建队列时,加入 x-message-ttl
参数实现的,单位是毫秒
// 设置ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() {
// 设置 20 秒过期
return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20*1000).build();
}
// 设置队列和交换机绑定
@Bean("ttlQueue2")
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
设置过期时间,也可以采用下面的方式
@Bean("ttlQueue2")
public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange, @Qualifier("ttlQueue2") Queue queue) {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 20000); // 20 秒过期
return BindingBuilder.bind(queue).to(exchange);
}
@RequestMapping("/ttl")
public String ttl() {
// 发送不带 ttl 的消息
rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...");
return "发送成功";
}
运行程序,观察结果。
Features
有一个 TTL
标识 调用接口,发送消息: http://127.0.0.1:8080/producer/ttl
Ready
消息为 1
采用发布订阅模式,所有与该交换机绑定的队列(ttl_queue
和 ttl_queueu2
)都会收到消息
由于 ttl_queue
对类,为设置过期时间,所以 ttl_queue
的消息未被删除
TTL
属性的方法,一旦消息过期,就会从队列中删除TTL
的方法,即使消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前进行判断的为什么这两种发发处理方式不一样?
RabbitMQ
只要定期从队头开始扫描是否有过期的消息即可TTL
的方式,每条消息的过期时间不同,如果要删除所有过期消息需要扫描整个队列,所以不如等到此消息被消费时再判定是否过期,如果过期再进行删除即可延迟队列(Delayed Queue
),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
延迟队列的使用场景有很多,比如:
RabbitMQ
本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 TTL
+死信队列的方式,组合模拟出延迟队列的功能
假设一个应用中需要将每条消息都设置为 10 秒的延迟,生产者通过 normal_exchange
这个交换机将发送的消息存储在 normal_queue
这个队列中。
normal_queue
这个队列,而是 dlx_queue
这个队列normal_queue
这个队列中过期之后,就被存入 dlx_queue
这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息先看 TTL
+ 死信队列实现延迟队列。(继续沿用前面死信队列的代码)
声明队列
@Bean("normalQueue")
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 1. 绑定死信队列
arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);
// 设置发送给死信队列的 RoutingKey
arguments.put("x-dead-letter-routing-key", "dlx");
return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}
生产者
发送两条消息,一条消息 10s
后过期,第二条 20s
后过期
@RequestMapping("/delay")
public String delay() {
// 发送带 ttl 的消息
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"ttl test 10s..." + new Date(), messagePostProcessor -> {
// 设置 10s 过期
messagePostProcessor.getMessageProperties()
.setExpiration("10000");
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"ttl test 20s..." + new Date(), messagePostProcessor -> {
// 设置 20s 过期
messagePostProcessor.getMessageProperties()
.setExpiration("20000");
return messagePostProcessor;
});
return "发送成功!";
}
消费者
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("%tc 死信队列接收到消息:%s, deliveryTag: %d%n", new Date(),
new String(message.getBody(), "UTF-8"), deliveryTag);
}
运行程序
调用接口,发送数据:
可以看到,两条消息按照过期时间一次进入了死信队列
延迟队列,就是希望等待特定的时间之后,消费者才能拿到这个消息。TTL
刚好可以让消息延迟一段时间成为死信,成为死信的消息会被投递到死信队列里面,这样消费者一直消费死信队列里的消息就可以了
接下来把生产消息的顺序修改一下, 先发送 20s
过期数据,再发送 10s
过期数据
@RequestMapping("/delay")
public String delay() {
// 发送带 ttl 的消息
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"ttl test 20s..." + new Date(), messagePostProcessor -> {
// 设置 20s 过期
messagePostProcessor.getMessageProperties().setExpiration("20000");
return messagePostProcessor;
});
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal",
"ttl test 10s..." + new Date(), messagePostProcessor -> {
// 设置 10s 过期
messagePostProcessor.getMessageProperties().setExpiration("10000");
return messagePostProcessor;
});
return "发送成功!";
}
通过控制台观察死信队列消费情况:
10s
过期的消息,也是在 20s
后才进入到死信队列消息过期之后,不一定会被马上丢弃。因为 RabbitMQ
只会检查队首消息是否过期。若过期,则丢到死信队列,此时就会造成一个问题
所以在考虑使用 TTL
+ 死信队列实现延迟任务队列的时候,需要确认业务上每个人物的延迟时间是一致的,如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延时的消息建立单独的消息队列
RabbitMQ
官方也提供了一个延迟的插件来实现延迟功能
接下来看具体操作:
插件下载地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
根据自己的 RabbitMQ
版本,选择相应版本的延迟插件,下载后上传到服务器
插件上传目录参考:
延迟队列作为 RabbitMQ
的高级特性,也是面试的一大重点
延时队列是一个特殊的对类,消息发送之后,并不会立即给消费者,而是等待特定的时间,才发送给消费者。
延迟队列的应用场景有很多,比如:
但 RabbitMQ
本身并没有直接实现延迟队列,通常有两种方式:
TTL
+ 死信队列组合的方式二者对比:
DLX
的时序问题