如果你们对 RabbitMQ
感到陌生,那可以停止往下阅读了
请先去查阅相关资料,对它有一个基本的了解之后再接着阅读本文
本文会以循序渐进的方式来讲解标题:
使用 RabbitMQ 的延迟队列来实现:订单在30分钟之内未支付则自动取消
所以请你们耐心逐步往下看
另外,实现标题的方式有很多,但本文只讲其中之一的 延迟队列
,至于其他方式,不在本文讲解范围之内,如果想了解,烦请你们自行去查阅
RabbitMQ
的模型架构,相信你们都知道
消息
由 Producer
生成,经 Exchange
路由到 Queue
,然后推给 Consumer
进行消费
消费消息有两种方式
如果 消息
经 Exchange
无法路由到符合条件的队列时,该 消息
该如何处理,是返还给 Producer
还是直接丢弃?
如果 消息
被路由到 Queue
时发现没有任何消费者,该 消息
该如何处理,是存在 Queue
中还是返还给 Producer
?
作为一个牛皮的中间件,一旦涉及到可选项了,应该怎么做?
我相信你们已经想到了,那肯定是增加配置参数来支持可选项嘛!
mandatory
参数用于设置消息是否必须被路由到队列中,默认值是 false
当 mandatory
参数设置为 true
时,Exchange
无法根据自身的类型和路由键找到一个符合条件的 Queue
,那么 RabbitMQ
会调用 Basic.Return
命令将消息返回给生产者。当 mandatory
参数设置为 false
时,出现上述情形,则消息直接被丢弃
当 mandatory
值为 false
时
代码执行正常,但没有输出结果,所以我们不确定消息是否投递了
但我们可以通过 RabbitMQ
管理界面,看 Exchange
概况
来确定消息确实投递了
当 mandatory
值为 true
时,需要添加一个监听器 ReturnListener
代码执行正常,同时也有输出结果
2024-06-01 14:54:52|AMQP Connection 10.5.108.226:5672|com.qsl.rabbit.PriorityMessageTest|INFO|59|Basic.Return 返回结果:mandatory test
也可以通过 RabbitMQ
管理界面,看 Exchange
概况来确定消息是否投递过
作为拓展,给你们留两个问题
immediate
参数用于设置消息是否立即发送给消费者,默认值是 false
当 immediate
参数设置为 true
时,如果消息路由到队列时发现队列上并没有任何消费者,那么该消息不会存入队列中,当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return
返回至生产者
immediate 为 true ,消息路由到匹配的队列时
执行如下代码
你会发现报错
2024-06-01 16:16:06|AMQP Connection 10.5.108.226:5672|org.springframework.amqp.rabbit.connection.CachingConnectionFactory|ERROR|1575|Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)
这是因为从 RabbitMQ 3.0
版本开始去掉了对 immediate
参数的支持,对此官方解释如下
immediate
参数会影响镜像队列的性能,增加了代码复杂性,建议采用TTL
和DLX
替代immediate
概括来讲,mandatory
针对的是消息能否路由到至少一个队列中,否则将消息返回给生产者。immediate
针对的是消息能否立即投递给消费者,否则将消息直接返回给生产者,不用将消息存入队列而等待消费者
生产者在发送消息时,如果不设置 mandatory
参数(或设置为 false
),那么消息在未被路由的情况下会丢失;如果设置了 mandatory
(且设置成 true
),那么需要添加对应的 ReturnListener
逻辑,生产者的代码会变得复杂。如果既不想增加生产者的复杂,又不想消息丢失,那么就可以使用备份交换器(Alternate Exchange
),将未被路由的消息存储在 RabbitMQ
中,在需要的时候再去处理这些消息
实现代码如下
执行如下测试代码
消息通过 com.qsl.normal.exchange
,经路由键 123
未匹配到任何队列,此时消息就会发送给 com.qsl.normal.exchange
的备份交换器 com.qsl.alternate.exchange
,因为备份交换器的类型是 fanout
,所以消息会被路由到 com.qsl.alternate.exchange
绑定的所有队列上,目前只有一个队列 com.qsl.unrouted.queue
,所以消息最终来到 com.qsl.unrouted.queue
,消息流转如下
在 RabbitMQ
控制台看队列状况如下
备份交换器和普通的交换器没有太大的区别,为了方便使用,推荐选择 fanout
类型;你们也可以选择其他类型,比如 direct
或 topic
,但此时需要保证消息被重新路由到备份交换器的路由键和生产者发出的路由键是一样的,否则消息不能正确路由到备份交换器的队列中,消息会丢失!
关于备份交换器,以下几种特殊情况需要注意
TTL,Time to Live 的简称,字面意思生存时长,也有很多人称过期时间,个人更习惯称过期时长
RabbitMQ
有两种方法对消息设置过期时长
如果两种方法一起使用,则消息的过期时长以两者之间较小值为准(而非单纯的以消息的过期时长为准)
消息在队列中的生存时间一旦超过设置的过期时长,就会变成 死信(Dead Message)
,消费者将无法再通过正常的路由收到该消息
可以通过绑定
死信队列
来消费Dead Message
通过队列属性 x-message-ttl
可以设置消息的过期时长,单位是毫秒,示例代码如下
如果不设置 TTL
,消息不会过期;如果 TTL
设置成 0,则表示除非此时可以将消息直接投递给消费者,否则该消息直接被丢弃,这个特性是不是看起来很眼熟?回过头去看看 immediate
为 true
时的第 1 个特性
1.部分队列有消费者,有消费者的队列会立即将消息投递给消费者,没有消费者的队列会丢弃该消息
通过参数 expiration
可以单独设置每个消息的过期时长,单位也是毫秒,示例代码如下
这两种方法的过期策略是怎样的,大家思考下再往下看
对于设置队列属性 x-message-ttl
的方法,队列中的消息具有相同的过期时长,队列中已过期的消息肯定是在队列头部,RabbitMQ
只需要定期的从队头开始往队尾扫描,一旦消息过期则从队列中剔除,一旦扫描到 未过期
的消息,则本次扫描完成
对于设置参数 expiration
的方法,每个消息可以设置不同的过期时长,那么过期的消息不一定在队列头部,如果要删除队列中所有过期消息,只能扫描整个队列,此时的成本是比较高的,所以采用惰性删除,即消息即将被投递给消费者时做过期判定,如果过期则进行删除
如果既设置了队列属性 x-message-ttl
,又设置了 expiration
,那该如何判定消息是否过期了呢?
定期删除 + 惰性删除,
Redis
的过期策略是不是也是这个?
这里针对的是队列,而非队列中的消息,大家别和 消息的 TTL
搞混了
通过参数 x-expires
可以设置队列被自动删除前处于未使用状态的时长,单位是毫秒,不能设置为 0
未使用状态需要满足三点
Basic.Get
命令RabbitMQ
能保证在过期时长到达后将队列删除,但不保障及时。RabbitMQ
重启后,持久化的队列的过期时长会被重新计算
如下是创建一个过期时长为 30 分钟的队列
队列信息如下
讲 死信队列
之前,我们得先了解 DLX
,全称 Dead-Letter-Exchange
,中文翻译成 死信交换器
当消息在一个队列中变成死信之后
消息变成死信的情况包括以下3种
Basic.Reject/Basic.Nack
),并设置参数 requeue
为 false
它能被重新发送到另一个交换器中,这个交换器就是 DLX
,而绑定到 DLX
的队列就是 死信队列
DLX
也是一个正常的交换器,和一般的交换器没有区别,它可以和任何队列进行绑定,当绑定的队列中存在死信时,RabbitMQ
就会自动将这个消息重新发布到设置的 DLX
上,进而被路由到 死信队列
。死信队列
也是可以被监听的,也可以有消费者对 死信队列
中的消息进行消费处理的
所以,死信队列
可以变相的实现 immediate
为 true
时的第 2 种情况
2.全部队列都没有消费者,则将该消息返回给生产者
为什么是 变相
,因为不是直接将消息返回给生产者,而是生产者可以监听 死信队列
,使消息回到生产者;虽然结果一致,但实现方式还是有区别的
那么 immediate
为 true
的特性,就可以用 TTL + 死信队列
来替代了
通过参数 x-dead-letter-exchange
可以给队列添加 DLX
;通过参数 x-dead-letter-routing-key
可以给这个 DLX
指定路由键,如果未配置该参数,则使用原队列的路由键,实现代码如下
执行如下测试代码
消息通过交换器 com.qsl.normal.exchange
,经路由键 ttlMessage
匹配到队列 com.qsl.message.ttl.queue
中,队列设置了 x-message-ttl
为 3000 毫秒,这段时长内队列上没有消费者消费这条消息,消息过期。由于给队列设置了死信交换器 com.qsl.dlx.exchange
,消息会通过该交换器,经路由键 dlx_routing_key
匹配到队列 com.qsl.dlx.queue
中,消息最终存储在该死信队列中,消息流转如下
RabbitMQ
控制台,可以看到队列状况如下
DLX
是一个非常有用的特性,它可以处理异常情况下,消息不能够被消费者正确消费而被置入到死信队列中,保证消息不被丢失;后续分析程序可以通过消费死信队列中的消息来分析当时所遇到的异常情况,进而改善和优化系统
DLX
还有一个很重要的功能,它配合 TTL
可以实现延迟队列,具体实现请往下看
延迟队列存储的对象是延迟消息
延迟消息
指的是需要延迟消费的消息 就是当消息发送之后,并不想让消费者立即拿到消息,而是等待特定时长后,消费者才拿到消息进行消费
延迟队列的使用场景有很多,例如:
RabbitMQ
本身并没有直接支持 延迟队列
的功能,但是可以通过 DLX
和 TTL
模拟出 延迟队列
的功能,具体实现已经在上一节(死信队列
)中完成了,你们可以网上翻一翻
给大家演示 场景1
的完整示例,时间改成 1 分钟内完成支付
生产者端配置
消费者端配置
消息发送
输出日志如下
实际应用中,可以根据延迟时长给延迟队列划分多个等级,例如
目前 RabbitMQ
提供了另外的方式来实现 延迟队列
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
感兴趣的可以去看看
mandatory
针对的是消息能否路由到至少一个队列中,否则将消息返回给生产者
immediate
针对的是消息能否立即投递给消费者,否则将消息直接返回给生产者,不用将消息存入队列而等待消费者
RabbitMQ 3.0
版本开始去掉了对 immediate
参数的支持,可以用 DLX
和 TTL
来代替
x-message-ttl
和消息的参数 expiration
队列也可以设置过期时长,该时长内队列一直处于未使用状态则会被删除;通过队列参数 x-expires
来设置
DLX
)上的队列就是死信队列
DLX
能够保证异常的情况下消息不会丢失,后续通过分析死信队列中的消息,可以改善和优化系统
DLX
与 TTL
《RabbitMQ实战指南》