Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >基于spring event实现消息异步延时队列

基于spring event实现消息异步延时队列

作者头像
木左
发布于 2024-03-26 02:20:01
发布于 2024-03-26 02:20:01
7422
举报

一、业务场景

最近做个小项目,基本单体应用就能满足要求。项目虽然小,但是需求可一点都不少,真是麻雀虽小,要求五脏俱全。这里面有个业务场景是需要给相应的人员发送消息通知。

之前做分布式应用都是引入第三方组件mq,单独部署一个消息服务用于接收mq消息并发送对应的通知。现在单体应用也想解耦处理,但是引入组件感觉还得部署mq,多少有些浪费,想着直接内部处理下。

于是乎想到了Spring本身的event。这不也是发个消息,然后监听者收到后进行逻辑处理。但是看了下直接发的话不做任何处理会是同步方式进行处理,而我其实需要得是异步,那就再加个注解@async。这样就达到了异步处理。

二、开搞

最终整体代码结构如下

最开始确实可以达到异步处理,但是在测试的时候发现,日志打印的时候,发送消息的日志还没打印,接收消息得日志都打印出来了,如下图。

于是乎想到能不能延迟下处理,就像组件mq提供的功能一样。

有问题找百度,看看是不是有人已经解决过这个问题了,结果找了半天,就找到类似的,但是做法就是发送的时候自己搞了一个延时队列,到时间后再执行发送,感觉实现也算实现了,但是感觉和我想要的不一样。因为消息处理本身是在线程池里处理的,有个任务队列,自己再搞一个多少有点别扭。还有一种是在监听的方法里线程sleep,这种感觉也不行。我总共开两个线程,那任务堆积不是很严重,影响处理效率。

好吧,既然没有能直接拿来主义的,那就自己研究研究。既然@async本身就是交给了线程池处理,而我使用的是ThreadPoolTaskExecutor,这个线程池不支持延时队列,解决办法就只能是sleep,于是乎我换成了有延时队列的线程池ScheduledThreadPoolExecutor。

那现在就是看@async注解是怎么把任务扔到任务队列里的,找了下源码,打上断点看看。

跟着走,发现最后调用ScheduledThreadPoolExecutor的Submit方法放进去的。如下图,那看到这个方法实际调用的是schedule方法,而传参里默认是不延迟。

找到这里就好说了,我直接继承下ScheduledThreadPoolExecutor类然后用子类覆盖下图里的submit方法,然后直接固定延迟5s。如下图。

调整完测试下,结果如图:

可以看到 真的实现了。真是今个要高兴,咱是老百姓啊,去个厕所放放水,搞这么长时间,都忘记上厕所了。回来接着干。。。

没有压力就是轻松,看着实现的功能准备再试试,结果又想到,这只能固定延迟多少秒,能不能和组件mq一样,交给消息本身自己设置,我这里取到消息内容里得延迟字段,直接设置延迟。

然后就在抽象基类里加了个延时枚举,重启后把断点打在了submit上,如图。

看到这个有点晕,找找有没有我的消息内容吧,然后就找到了。

看是看到了,怎么拿出来呢,看到arg$2有点蒙,这是什么东西,咋拿出来呢。然后看他是怎么构造出来的,一找找到了

是使用Lambda表达式创建的,这咋能拿到啊,继续百度,最后找了好长时间,硬是没找到,就是取不到。正想着放弃呀,反正固定延时目前也行,但是看着这个arg$2总感觉是既熟悉又陌生,实体要是断点出来好像这个就是对象的属性名称,要不直接反射拿下值试试?那就试试吧。结果就写代码试。

结果断点下取到了。

完美解决啦。看看日志。。

解决解决,完美解决。刚要高兴,定睛一看,怎么那个tlog追踪异步不一样啊,重新试了下,还真是不一样,重启后发现是第一次创建线程的时候传入的,后续不会改了。这要是查日志追踪,全是一样的就不好了,百度了下,有人解决了直接拿来。

结果改了这个,处理消息延迟取内容报错了,是被包装了,改了下。

最后再试试。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 木左侃技术人生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
2 条评论
热度
最新
可以,大佬,互粉一下
可以,大佬,互粉一下
11点赞举报
公号发下,互关学习
公号发下,互关学习
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
消息队列面试解析系列(一)- 消息队列的意义
见名知义,消息队列主要就是用来发送和接收处理消息,但它的作用可不仅解决应用间通信问题。
JavaEdge
2021/02/22
1.1K0
消息队列面试解析系列(一)- 消息队列的意义
用了这么久的RabbitMQ异步编程竟然都是错的!
在注册流程中,数据写DB是主流程,但注册后给用户发优惠券或欢迎短信是分支流程,时效性也不强。
main方法
2020/12/07
6840
用了这么久的RabbitMQ异步编程竟然都是错的!
深入了解消息队列:揭示消息队列的概念、原理以及应用场景
消息队列 (Message Queue, MQ) 用于实现服务之间的异步通信、服务解耦、流量控制、发布订阅、高并发缓冲等。
Lion 莱恩呀
2025/04/19
3730
深入了解消息队列:揭示消息队列的概念、原理以及应用场景
用了这么久的RabbitMQ异步编程竟然都是错的!
优秀的项目都由同步、异步和定时任务三种处理模式相辅相成。其中当属异步编程充满坑点。
JavaEdge
2021/02/23
1.3K0
用了这么久的RabbitMQ异步编程竟然都是错的!
RocketMQ消息队列的最佳实践
tags可由应用自行设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:
JavaEdge
2021/10/18
5910
常见的并发场景
最常见、比较复杂一个场景是Web容器的线程池。Web容器使用线程池同步或者异步处理HTTP请求,同时这也可以有效的复用HTTP连接,降低资源申请的开销。通常我们认为HTTP请求时非常昂贵的,并且也是比较耗费资源和性能的,所以线程池在这里就扮演了非常重要的角色。
技术从心
2019/08/05
8780
消息队列消息延迟解决方案,跟着做就行了
前面我们讲到了使用消息队列解决了我们电商系统的各种问题,削峰填谷、异步处理以及系统间解耦合,同时也对其重复消息问题进行了详细方案讲解(你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你,秒杀系统每秒上万次下单请求,我们该怎么去设计)。那我们在消息队列的使用过程中还有没有需要注意的地方呢?
架构师修炼
2020/07/20
1.5K0
消息队列消息延迟解决方案,跟着做就行了
万字图解线程池ThreadPoolExecutor、ForkJoinPool、定时调度 STPE 使用场景和原理
J.U.C 提供的线程池:ThreadPoolExecutor 类,帮助开发人员管理线程并方便地执行并行任务。
码哥字节
2025/04/27
6.7K0
万字图解线程池ThreadPoolExecutor、ForkJoinPool、定时调度 STPE 使用场景和原理
消息队列面试解析系列(一)-消息队列(MQ)的意义
见名知义,消息队列主要就是用来发送和接收处理消息,但它的作用可不仅解决应用间通信问题。
JavaEdge
2022/11/30
1.6K0
消息队列面试解析系列(一)-消息队列(MQ)的意义
快速掌握并发编程---线程池的原理和实战
池化技术简单点来说,就是提前保存大量的资源,以备不时之需。在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升性能等。
田维常
2020/11/03
2890
快速掌握并发编程---线程池的原理和实战
Mq消息队列核心问题剖析与解决
这篇文章就谈谈对mq各种问题的思考,以及不同的mq业务方案的解决,注意这篇文章为了解决在学习三大mq的一些问题,和不同mq差异导致的出现的不同的消息解决方案,这往往是很多人所忽视的,我教你!
Joseph_青椒
2023/10/06
1.3K0
Mq消息队列核心问题剖析与解决
消息队列设计1 何时需要
消息队列通常作为企业IT系统内部通信的核心手段 具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
JavaEdge
2019/01/03
5360
构建高效稳定的并发处理系统:从理论到实战的全面优化指南
在如今的互联网时代,应用程序需要处理大量并发用户请求。无论是电子商务平台上的秒杀活动,还是社交媒体的实时消息推送,系统的高并发处理能力直接影响着用户体验和企业的竞争力。在这种情况下,如何高效地管理线程资源,成为了每个开发者需要面对的重要课题。
繁依Fanyi
2024/08/14
5630
一次线上事故,我顿悟了异步的精髓
写这篇文章,笔者想和大家深入探讨该场景的架构优化方案。希望大家读完之后,可以对异步有更深刻的理解。
勇哥java实战
2022/07/10
5730
一次线上事故,我顿悟了异步的精髓
亿级流量网站架构核心技术【笔记】(二)
九、应用级缓存 A.缓存简介 1.先从缓存中读取数据,如果没有,再从慢速设备上读取实际数据并同步到缓存 2.经常读取的数据、频繁访问的数据、热点数据、I/O瓶颈数据、计算昂贵的数据、符合5分钟法则和局部性原理的数据都可以缓存 B.缓存命中率 1.缓存命中率=从缓存中读取次数/【总读取次数(从缓存中读取次数+从慢速设备上读取次数)】 C.缓存回收策略 1.基于空间,指缓存设置了存储空间 2.基于容量,指缓存设置了最大大小 3.基于时间
硬核项目经理
2019/08/06
1.3K0
亿级流量网站架构核心技术【笔记】(二)
离线数据推送问题(消息队列)
  每天晚上9点多我要起身下班的时候,抬眼看到周围还在公司的也就只有几个刚毕业的小鲜肉了,就感觉自己也好年轻啊。虽然不止感觉,其实本来也不老。但是转眼又一想,我晚饭都不吃,每天多比人家工作了好几个小时,但是水平的提高一点也不成正比啊,是不是方法不当啊。唯一成正比的,我确实把身体锻炼好了,骑自行车40公里来上班,中间不停,骑得飞快,整个路程总体还是往高处走的,到公司休息一下正常上班,一点儿感觉都没有。有一种无知叫做不知道什么事情是不可能的,想到就去做了。   直到中午和我们同事90后MM去吃饭,她最近想在网上
静儿
2018/07/02
1.4K0
我攻克的技术难题:自定义延时消息队列
消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批。
不惑
2024/01/29
3140
我攻克的技术难题:自定义延时消息队列
Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程
线上某服务 A 调用服务 B 接口完成一次交易,一次晚上的生产变更之后,系统监控发现服务 B 接口频繁超时,后续甚至返回线程池耗尽错误 Thread pool is EXHAUSTED。因为服务 B 依赖外部接口,刚开始误以为外部接口延时导致,所以临时增加服务 B dubbo 线程池线程数量。配置变更之后,重启服务,服务恢复正常。一段时间之后,服务 B 再次返回线程池耗尽错误。这次深入排查问题之后,才发现 Kafka 异步发送消息阻塞了 dubbo 线程,从而导致调用超时。
andyxh
2019/10/09
1.2K0
Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程
五种线程池的对比与使用
通过源码可以看出底层调用的是ThreadPoolExecutor方法,传入一个同步的阻塞队列实现缓存。
爱撸猫的杰
2020/01/08
1K0
你真的需要消息队列吗
企鹅号小编
2018/01/05
1.5K0
你真的需要消息队列吗
推荐阅读
相关推荐
消息队列面试解析系列(一)- 消息队列的意义
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档