前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Rabbitmq 通过死信队列实现延迟消息发送

Rabbitmq 通过死信队列实现延迟消息发送

作者头像
芥末鱿鱼
发布于 2022-05-05 06:55:24
发布于 2022-05-05 06:55:24
54100
代码可运行
举报
文章被收录于专栏:玩转 Spring Cloud玩转 Spring Cloud
运行总次数:0
代码可运行

Rabbitmq 通过死信队列实现延迟消息发送

文章目录

设置消息的过期时间(TTL)

TTL, Time to Live 的简称, 即过期时间.

两种方法设置 TTL

  1. 通过队列属性设置. 即队列中所有的消息都有相同的过期时间. 在 channel.queueDeclare 方法中加入 x-message-ttl 参数实现, 单位是毫秒
  2. 对消息本身进行单独设置. 即每条消息的 TTL 可以不同. 在 channel.basicPublish 方法中加入 expiration 属性, 单位是毫秒 总结: 如果两种方法一起使用, 则以较小的那个 TTL 为准. 消息过期后, 消费者无法再接收该消息, 就会变成死信(Dead Message). 这个特性可以实现延迟队列功能

Java 代码实现

给队列设置 TTL

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        rabbitAdmin.declareExchange(new TopicExchange("exchange.expiration"));
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-message-ttl", 2000);// 2s 过期
        Queue queue = new Queue("queue.expiration", true, false, false, argMap);
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareBinding(
            BindingBuilder.bind(queue).to(new TopicExchange("exchange.expiration")).with("routingkey.expiration"));
        return rabbitAdmin;
    }
}

@Service
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String exchange, String routingKey, String msg) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg);
    }
}

给每一个消息单独设置 TTL

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;  
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        rabbitAdmin.declareExchange(new TopicExchange("exchange.expiration"));
        rabbitAdmin.declareQueue(new Queue("queue.expiration"));
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.expiration"))
            .to(new TopicExchange("exchange.expiration")).with("routingkey.expiration"));
        return rabbitAdmin;
    }
}

@Service
public class SendMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String exchange, String routingKey, String msg, Integer expirationTime) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
            message.getMessageProperties().setExpiration(expirationTime);
            return message;
        });
    }
}

死信队列

DLX(Dead-Letter-Exchange), 可以称之为死信交换器, 也成死信信箱. 当消息在一个队列中变成死信(dead message) 后, 会被重新发送到另外一个交换器中, 这个交换器就是 DLX. 绑定了 DLX 的队列就是死信队列. 说白了就是, 有两个队列, 一个队列上的消息设置了过期时间, 但没有消费者. 另一个队列是普通队列, 有消费者. 后者被称为死信队列. 当前一个队列消息过期后, Rabbitmq 会自动将过期消息转发到死信队列里. 然后被死信队列的消费者消费掉. 实现消息的延迟发送功能

延迟队列

延迟队列是为了存放那些延迟执行的消息,待消息过期之后消费端从队列里拿出来执行

实现方法

通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数为这个队列添加 DLX

Java 代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Configuration
public class RabbitConfig implements ApplicationContextAware {
    
    private ApplicationContext applicationContext;    
    
    @PostConstruct
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = applicationContext.getBean("rabbitAdmin", RabbitAdmin.class);
        // 死信队列
        rabbitAdmin.declareExchange(new DirectExchange("exchange.dlx"));
        rabbitAdmin.declareQueue(new Queue("queue.dlx"));
        rabbitAdmin.declareBinding(
            BindingBuilder.bind(new Queue("queue.dlx")).to(new DirectExchange("exchange.dlx")).with("routingkey.dlx"));

        // 延迟队列
        Map<String, Object> argMap = new HashMap<>(3);
        argMap.put("x-message-ttl", 2000);// 2s 过期
        argMap.put("x-dead-letter-exchange", "exchange.dlx");
        argMap.put("x-dead-letter-routing-key", "routingkey.dlx");
        rabbitAdmin.declareQueue(new Queue("queue.normal", true, false, false, argMap));
        rabbitAdmin.declareExchange(new TopicExchange("exchange.normal"));
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.normal", true, false, false, argMap))
            .to(new TopicExchange("exchange.normal")).with("queue.normal"));
    }
}

缺点

使用死信队列来实现消息的延迟发送. 如果是采用第一种方式, 即每个队列设置相同的过期时间, 可以很好的实现消息的延迟发送功能. 如果采用第二种方式, 给每个消息设置不同的过期时间, 由于队列先入先出的特性, 如果队列头的消息过期时间很长, 后面的消息过期时间很短, 会导致后面的消息过期后不能及时被消费掉

简单的做法时, 使用 rabbitmq 的延迟插件: Rabbitmq 通过延迟插件实现延迟队列

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Rabbitmq 通过延迟插件实现延迟队列
由于队列先入先出的特性. 通过死信队列(DLX)和给每条消息设置过期时间(TTL)来实现延迟队列, 会存在时序问题. 即排在队列头的消息过期使时间如果设置的比较长, 会导致队列后面过期时间比较短的消息, 过期了迟迟不被消费掉. 可以通过给 Rabbitmq 安装延迟插件来实现延迟队列功能
芥末鱿鱼
2022/05/05
1.2K0
RabbitMQ延迟消费和重复消费
转载自 https://blog.csdn.net/quliuwuyiz/article/details/79301054
allsmallpig
2021/02/25
2.3K0
RabbitMQ实现延时队列(死信队列)
TTL是time to live 的简称,顾名思义指的是消息的存活时间。rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身。 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒。不同队列的过期时间互相之间没有影响,即使是对于同一条消息。队列中的消息存在队列中的时间超过过期时间则成为死信。
Java_老男孩
2019/12/02
2.2K0
一文搞懂Spring-AMQP
12//设置消息发送ack,默认noneconnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
爱撒谎的男孩
2020/03/11
1.1K0
springboot与rabbitmq整合
之前学习了rabbitmq,对其基本的用法有了一定的认识,但对其深层次的使用必须依赖于具体的业务,就像编程语言一样,提供的基础的使用规范,但是却产生了那么多高性能高可用的框架,都是依据最基础的功能,在与思想的磨合下而产生,同样,如何使用好mq技术,并能实现各种复杂的业务,那么就必须在某种思维的模式或业务领域的促使下才能完成。
sucl
2019/08/07
1.3K0
一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列
初探RabbitMQ消息队列中介绍了 RabbitMQ的简单用法,顺带提及了下延迟队列的作用。所谓 延时消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
battcn
2018/08/03
1.3K0
一起来学SpringBoot | 第十三篇:RabbitMQ延迟队列
RabbitMQ与Spring的框架整合之Spring AMQP实战
  RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可。注意,autoStartup必须设置为true,否则Spring容器不会加载RabbitAdmin类。RabbitAdmin底层实现就是从Spring容器中获取Exchange交换机、Binding绑定、RoutingKey路由键以及Queue队列的@Bean声明。
别先生
2019/12/02
1.7K0
SpringBoot+RabbitMQ 实现延迟队列
rabbitmq 自身的一些概念,可以去网上或者书上获得。rabbitmq 延迟队列的实现原理,网上资料很多,简单盗图一张。
水货程序员
2020/04/22
6540
Springboot 整合RabbitMQ ---基于Class的开发
1 加载配置文件 package com.zjxnjz.mall.core.config; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import
用户5927264
2019/08/01
4670
RabbitMQ & 死信队列DLX & TTL+DLX实现延迟队列
Dead Letter Exchange 死信交换机(RabbitMQ叫死信队列)
收心
2022/09/23
5310
RabbitMQ & 死信队列DLX & TTL+DLX实现延迟队列
rabbitMQ结合spring-boot使用(2).md
这一章节我们会学习rabbitMQ在项目生产中一些重要的特性,如持久化,消息确认机制,消息过期等特性。只要能利用好这些特性,我们就能开发出可用性强的,功能强大的MQ系统。
六个核弹
2022/12/23
3720
整合RabbitMQ&Spring
RabbitAdmin类可以很好的操作RabbitMQ,在spring中直接进行注入即可
用户1212940
2022/04/13
2800
SpringBoot RabbitMQ
RabbitMQ的流程是:生产者将消息发送到对应交换机上,交换机再将消息转发到绑定的队列上,消费者从绑定的队列获取消息进行消费。
用户8682940
2021/12/02
5770
RabbitMQ 延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。
用户9615083
2022/12/25
6420
RabbitMQ 延迟队列
浅析RabbitMQ的延迟队列
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该
小熊学Java
2023/07/16
3930
浅析RabbitMQ的延迟队列
03、RabbitMQ延迟队列(死信交换机)
天蝎座的程序媛
2023/10/17
2750
03、RabbitMQ延迟队列(死信交换机)
RabbitMQ 实现延迟队列的两种方式!
定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如
江南一点雨
2021/12/09
7580
RabbitMQ 实现延迟队列的两种方式!
RabbitMQ延迟队列设置 顶
延迟消费。比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
算法之名
2019/10/14
8400
RabbitMQ延迟队列设置
                                                                            顶
【RabbitMQ】一文带你搞定RabbitMQ延迟队列
在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列。相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你还不了解死信队列,那么建议你先进行上一篇文章的阅读。
弗兰克的猫
2019/07/30
8880
【RabbitMQ】一文带你搞定RabbitMQ延迟队列
Rabbitmq延迟队列实现定时任务
开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期、订单定时关闭、微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列、基于优先级队列的JDK延迟队列、时间轮等。因为我们项目中本身就使用到了Rabbitmq,所以基于方便开发和维护的原则,我们使用了Rabbitmq延迟队列来实现定时任务,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章Spring boot集成RabbitMQ
搜云库技术团队
2019/10/18
3.5K0
相关推荐
Rabbitmq 通过延迟插件实现延迟队列
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文