Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >MQ·将多消息合并为一条消息的发送、消费的设计与实现

MQ·将多消息合并为一条消息的发送、消费的设计与实现

作者头像
Bug开发工程师
发布于 2020-02-19 13:51:01
发布于 2020-02-19 13:51:01
4.1K1
举报
文章被收录于专栏:码农沉思录码农沉思录

以下文章来源于Java艺术,作者wujiuye

优质文章,及时送达

这是笔者最近处理一个叫异步大点击的业务问题所思考出来的方案。由于mq使用的是亚马逊的sqs服务,而sqs是按请求数消费的原因,所以才有的将多消息合并为一条消息发送的想法。

这个想法从sqs的消息批量发送以及阿里限流中间件的qps统计、netty的EventLoopGroup设计中得到启发。本篇将介绍如何将多个消息合并成一个消息发送而不影响服务的并发性能,以及由于合并后产生的大消息消费出现的消息堆积现象,开的消费者越多反而消息堆积越多的bug。

为什么要将多消息合并为一个消息发送?

前面也说了,为了节约成本。以每分钟50w的广告点击数来算,一个月将产生50*60*24*31w的点击消息,再乘以3就是每个月的sqs请求数,3代表的是发送消息、拉取消息、删除消息,按每100w请求0.4美刀的价格计算大概一个月要26784美刀。

由于sqs限制单条消息的大小最大为256k,根据业务场景估算每点击消息也不可能达到1k,,所以我将256个请求合并为一个消息发送,或者1s内未达到256个消息也合并为一个消息发送,这样每月的费用可以直接除以256,这不是一个小数目。

什么样的业务场景下才适合这么干?

将大量消息合并为一个消息后会导致消息消费失去原子性。你无法保证原本是256个消息的合并为一个消息后,这256个消息能全部消费成功或者全部消费失败,因此要求业务必须允许消息消费失败直接丢弃的情况。无论多少个成功多少个失败,都需要将整条消息从mq中删除。笔者考虑过这个问题才决定是否要这样做的,也考虑过失败重试的问题,但我觉得没必要为这种概率买单,因为一个点击在非异步的情况下,失败就是失败了。

如何将大量消息合并为一条消息发送而不影响服务的高并发性能呢?

其实不影响是不存在的,只是让影响变得微弱。经过长时间的观察,我了解该高并发服务对内存的消耗并不高,最大qps下也就消耗1.5g左右的堆内存,而netty使用的直接内存大概在2g这样,对于2核8g的机器,有足够多的内存来实现队列缓存消息。

我借签Dubbo的客户端与服务端配置多个连接时使用轮询方式使用连接,同时也借签了netty的EventLoop的设计,实现消息合并发送。我定义一个MesaageLoopGroup,一个MesaageLoopGroup可以配置有多少个MesaageLooper,而每个MesaageLooper就是一个线程,且维护一个阻塞队列,默认队列大小是102400,这个数字是我配置单个进程所能打开的最大文件句柄数。

当往MesaageLoopGroup push一个点击消息时,先用原子类自增1与MesaageLooper数组的长度取余,选出一个MesaageLooper。然后再将消息push到这个MesaageLooper的阻塞队列。

每个MesaageLooper的run方法实现的就是一个死循环,从阻塞队列中拿消息,当消息等于256时,或者阻塞超过1s就将拿到的消息合并成一个消息发送到mq。如果阻塞队列满,那么push会直接将消息发送到mq。因此,服务重启时如果使用kill 9强行结束进程,至多只会有1s的数据丢失。设置1s还有一个原因就是控制消息的实时性。

灰度上线测试一天后也证明此方案对服务的影响并不大,无论是gc还是内存占用,都看不出加了这么一层逻辑。1s的平均请求按50w计算,四台机器分担,每个服务的每秒请求数平均是2000。

为何用golang实现消费者?

然而消息的消费并不顺利。一个是因为消息消费我用了golang实现,我也是刚入门,写起代码来还感觉别扭,二是一个消息是由原本256个消息组合而成的问题。

使用golang其实是有原因的。原本计划是让消费者占用较小的内存,以实现将消费者寄生在其它服务所在的机器上,充分利用其它耗内存而cpu利用率低的服务所在的机器。同时利用docker实现快速部署,让docker 的镜像更小,不需要安装jdk什么的。还有就是利用go的协程并发处理能力吧,让消费者消费消息的速度能赶上消息的产生速度。

为入门golang买单

为了便于理解,我还是以java的线程池来说明。假设我配置的线程池线程数量是512。寄生在其它服务的机器上需要给主人点面子,不能把人家的cpu全部吃完,导致主服务不可用,所以线程的数量结合消息的消费情况综合考虑,不能超过一半的cpu使用率,而选择512这个数量。

Sqs支持一次拉取多条消息,并且有一个可见性超时的特性,当消息被消费者拉取到之后,在多长时间内未删除,下次可能还会被拉取到,或者其它消费者还能拉取到。最初我设置的可见性超时是60s。

一开始我开启5个线程拉取消息,每次最多拉取10条消息。那么很可能同一时间内会拉取到50条消息。由于一条消息是由原本256条消息合并而成的,所以512个线程同一时间段至多只能消费2条消息,而一条消息(合并后的)的消费平均耗时是10s,也就是说一分钟内最多消费12条消息,其它38条消息在一分钟后会被其它消费者拉取到,所以就会出现大量消息重复消费的情况,久而久之,消息越积累越多。

我用golang的channel实现生产者与消费者,channel的大小可设置,当channel满时,拉取到的消息是放不进channel的,因此会将拉取线程阻塞住,只有消费者从 channel取数据才能继续放入。但阻塞的那段时间要小于消息的可见性超时,因为消息只有在开始消费时我才会将其从mq中删除。

后面的改进就是根据消费能力去调整消息的拉取线程数,以及每次拉取的消息数。还有一点要注意,为保证时刻有消息准备就绪开始消费,最好不要让消息消费完再从mq中拉取。但这也会导致另一个问题,一些消息拉取到本地后,由于channel已满,放不进,而其它空闲消费节点又拉不到,导致消息被消费到的时间延长。这就需要作出取舍。

祝大家在2020年工作顺路,家庭幸福,合家团圆

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
1 条评论
热度
最新
这是单机场景吧,如果是分布式场景,会有其他问题。比如:集群聚合出的数据可能是单机的好几倍,导致下游出现问题
这是单机场景吧,如果是分布式场景,会有其他问题。比如:集群聚合出的数据可能是单机的好几倍,导致下游出现问题
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
MQ那点破事!消息丢失、重复消费、消费顺序、堆积、事务、高可用....
为了便于大家查找问题,了解全貌,整理个目录,我们可以快速全局了解关于消息队列,面试官一般会问哪些问题。
微观技术
2021/10/11
1.5K0
1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?
并且,索性咱就直接把这个坑填得满满的,直接盘点RocketMQ支持的11种消息类型以及背后的实现原理
三友的java日记
2023/11/09
7130
1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?
深入了解消息队列:揭示消息队列的概念、原理以及应用场景
消息队列 (Message Queue, MQ) 用于实现服务之间的异步通信、服务解耦、流量控制、发布订阅、高并发缓冲等。
Lion 莱恩呀
2025/04/19
3380
深入了解消息队列:揭示消息队列的概念、原理以及应用场景
mq要如何处理消息丢失、重复消费?
如果要你实现一个支付宝向余额宝转账的功能,比如:账户a从支付宝转出5000余额宝转入5000,该怎么做呢?
苏三说技术
2020/10/15
1.4K0
mq要如何处理消息丢失、重复消费?
RocketMQ消息为什么会被重复消费?
当我们使用RocketMQ时,RocketMQ-Dashboard是一个非常好用的图形化界面工具
Java识堂
2022/04/06
2.7K0
RocketMQ消息为什么会被重复消费?
深入理解RocketMq普通消息和顺序消息使用,原理,优化
最近一直再做一些系统上的压测,并对一些问题做了优化,从这些里面收获了一些很多好的优化经验,后续的文章都会以这方面为主。
用户5397975
2019/11/28
3.5K0
深入理解RocketMq普通消息和顺序消息使用,原理,优化
消息中间件—RocketMQ消息消费(二)(push模式实现)
摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下RocketMQ中“Pull和Push两种消费方式的简要流程”以及“Push消费方式的启动流程”(ps:如果不熟悉这几块内容的童鞋,可以自己回顾下上一篇的内容)。本文将详细介绍RocketMQ中Push消费方式下的“Pull消息的长轮询机制”和“Consumer端的负载均衡机制”这两块关键核心内容。 由于RocketMQ系列的技术分享存在一定的连续性,因此希望读者能回顾下往期RocketMQ分享的篇幅: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送 (4)消息中间件—RocketMQ消息消费(一)
用户2991389
2018/09/05
2K0
消息中间件—RocketMQ消息消费(二)(push模式实现)
rebbitMQ【rebbitMQ入门到精通】
消息中间件基于队列模型实现异步/同步传输数据 作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
高大北
2022/09/07
4480
rebbitMQ【rebbitMQ入门到精通】
RocketMQ 设计原理与最佳实践
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
Java识堂
2021/06/01
1.2K0
RocketMQ 设计原理与最佳实践
经典得不能再经典的分布式服务和消息队列面试题
原文:https://juejin.im/post/5ccd8d606fb9a032136fda86
Bug开发工程师
2019/05/17
9070
经典得不能再经典的分布式服务和消息队列面试题
聊聊 RocketMQ 4.X 消费逻辑
RocketMQ 是笔者非常喜欢的消息队列,4.9.X 版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。
勇哥java实战
2023/06/05
1K0
RocketMQ之消费者启动与消费流程
RocketMQ是由阿里巴巴开源的分布式消息中间件,支持顺序消息、定时消息、自定义过滤器、负载均衡、pull/push消息等功能。RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。NameServer充当名字路由服务,整体架构图如下所示:
2020labs小助手
2022/07/12
1.1K0
消息中间件—RocketMQ消息消费(三)(消息消费重试)
摘要:如果Consumer端消费消息失败,那么RocketMQ是如何对失败的异常情况进行处理? 前面两篇RocketMQ消息消费(一)/(二)篇,主要从Push/Pull两种消费模式的简要流程、长轮询机制和Consumer端负载均衡这几点内容出发,介绍了RocketMQ消息消费的正常流程和细节内容,本篇内容将主要介绍Consumer端消费失败的异常流程。 这里先回顾往期RocketMQ技术分享的篇幅: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送 (4)消息中间件—RocketMQ消息消费(一) (5)消息中间件—RocketMQ消息消费(二)(push模式实现)
用户2991389
2018/09/05
3.8K0
消息中间件—RocketMQ消息消费(三)(消息消费重试)
面渣逆袭:RocketMQ二十三问
选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维行,可拓展性,社区活跃度。目前常用的几个中间件,ActiveMQ作为“老古董”,市面上用的已经不多,其它几种:
三分恶
2022/05/11
1.3K0
面渣逆袭:RocketMQ二十三问
消息中间件—RocketMQ消息消费(一)
文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送”两部分进行了详细的阐述,本文将主要从消息消费为切入点简要地介绍下“RocketMQ中Pull和Push的两种消费方式”、“RocketMQ中消费者(Push模式)的启动流程”和“RocketMQ中Pull和Push两种消费方式的简要流程”。在阅读本篇之前希望读者能够先仔细阅读下关于RocketMQ分布式消息队列的前几篇文章: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送
用户2991389
2018/09/05
2K0
消息中间件—RocketMQ消息消费(一)
消息队列(MQ)之生产者-消费者 | 一文搞定
消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的。
狼王编程
2021/06/01
1.1K0
消息队列(MQ)之生产者-消费者 | 一文搞定
消息队列消息丢失和消息重复发送的处理策略
来源:https://www.jianshu.com/p/533fc6fc0963 分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用,同时网络环境也是不稳定的,造成了我们多个机器之间的数据同步问题,这就是典型的分布式事务问题。 在分布式事务中事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是要保证不同节点之间的数据一致性
程序猿DD
2022/07/04
1.9K0
消息队列消息丢失和消息重复发送的处理策略
RocketMQ(四):消费前如何拉取消息?(长轮询机制)
上篇文章从Broker接收消息开始,到消息持久化到各种文件结束,分析完消息在Broker持久化的流程与原理
菜菜的后端私房菜
2024/09/26
7770
【MQ我可以讲一个小时】
引入消息中间件也会带来很多问题,先说说消息丢失,生产者往消息队列发送消息,消息队列往消费者发送消息,会有丢消息的可能,消息队列也有可能丢消息,通常MQ存盘时都会先写入操作系统的缓存页中,然后再由操作系统异步的将消息写入硬盘,这个中间有个时间差,就可能会造成消息丢失,如果服务挂了,缓存中还没有来得及写入硬盘的消息就会发生消息丢失。不同的消息中间件对于消息丢失也有不同的解决方案,先说说最容易丢失消息的kafka吧。生产者发消息给Kafka Broker:消息写入Leader后,Follower是主动与Leader进行同步,然后发ack告诉生产者收到消息了,这个过程kafka提供了一个参数,request.required.acks属性来确认消息的生产,0表示不进行消息接收是否成功的确认,发生网络抖动消息丢了,生产者不校验ACK自然就不知道丢了。1表示当Leader接收成功时确认,只要Leader存活就可以保证不丢失,保证了吞吐量,但是如果leader挂了,恰好选了一个没有ACK的follower,那也丢了。-1或者all表示Leader和Follower都接收成功时确认,可以最大限度保证消息不丢失,但是吞吐量低,降低了kafka的性能。一般在不涉及金额的情况下,均衡考虑可以使用1,保证消息的发送和性能的一个平衡。Kafka Broker 消息同步和持久化:Kafka通过多分区多副本机制,可以最大限度保证数据不会丢失,如果数据已经写入系统缓存中,但是还没来得及刷入磁盘,这个时候机器宕机,或者没电了,那就丢消息了,当然这种情况很极端。Kafka Broker 将消息传递给消费者:如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时消费者直接宕机了,未处理完的数据丢失了,下次也消费不到了。所以为了避免这种情况,需要将配置改为,先消费处理数据,然后手动提交,这样消息处理失败,也不会提交成功,没有丢消息。
Java廖志伟
2022/03/07
4860
【MQ我可以讲一个小时】
消息队列| RocketMQ 核心原理
a. fork下来,起一个demo,上一个测试环境,遇到问题再去社区提问或找些实践文章;
heidsoft
2019/06/15
3.6K0
推荐阅读
相关推荐
MQ那点破事!消息丢失、重复消费、消费顺序、堆积、事务、高可用....
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档