前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >MQ·将多消息合并为一条消息的发送、消费的设计与实现

MQ·将多消息合并为一条消息的发送、消费的设计与实现

作者头像
Bug开发工程师
发布于 2020-02-19 13:51:01
发布于 2020-02-19 13:51:01
4.1K01
代码可运行
举报
文章被收录于专栏:码农沉思录码农沉思录
运行总次数:1
代码可运行

以下文章来源于Java艺术,作者wujiuye

优质文章,及时送达

这是笔者最近处理一个叫异步大点击的业务问题所思考出来的方案。由于mq使用的是亚马逊的sqs服务,而sqs是按请求数消费的原因,所以才有的将多消息合并为一条消息发送的想法。

这个想法从sqs的消息批量发送以及阿里限流中间件的qps统计、netty的EventLoopGroup设计中得到启发。本篇将介绍如何将多个消息合并成一个消息发送而不影响服务的并发性能,以及由于合并后产生的大消息消费出现的消息堆积现象,开的消费者越多反而消息堆积越多的bug。

为什么要将多消息合并为一个消息发送?

前面也说了,为了节约成本。以每分钟50w的广告点击数来算,一个月将产生50*60*24*31w的点击消息,再乘以3就是每个月的sqs请求数,3代表的是发送消息、拉取消息、删除消息,按每100w请求0.4美刀的价格计算大概一个月要26784美刀。

由于sqs限制单条消息的大小最大为256k,根据业务场景估算每点击消息也不可能达到1k,,所以我将256个请求合并为一个消息发送,或者1s内未达到256个消息也合并为一个消息发送,这样每月的费用可以直接除以256,这不是一个小数目。

什么样的业务场景下才适合这么干?

将大量消息合并为一个消息后会导致消息消费失去原子性。你无法保证原本是256个消息的合并为一个消息后,这256个消息能全部消费成功或者全部消费失败,因此要求业务必须允许消息消费失败直接丢弃的情况。无论多少个成功多少个失败,都需要将整条消息从mq中删除。笔者考虑过这个问题才决定是否要这样做的,也考虑过失败重试的问题,但我觉得没必要为这种概率买单,因为一个点击在非异步的情况下,失败就是失败了。

如何将大量消息合并为一条消息发送而不影响服务的高并发性能呢?

其实不影响是不存在的,只是让影响变得微弱。经过长时间的观察,我了解该高并发服务对内存的消耗并不高,最大qps下也就消耗1.5g左右的堆内存,而netty使用的直接内存大概在2g这样,对于2核8g的机器,有足够多的内存来实现队列缓存消息。

我借签Dubbo的客户端与服务端配置多个连接时使用轮询方式使用连接,同时也借签了netty的EventLoop的设计,实现消息合并发送。我定义一个MesaageLoopGroup,一个MesaageLoopGroup可以配置有多少个MesaageLooper,而每个MesaageLooper就是一个线程,且维护一个阻塞队列,默认队列大小是102400,这个数字是我配置单个进程所能打开的最大文件句柄数。

当往MesaageLoopGroup push一个点击消息时,先用原子类自增1与MesaageLooper数组的长度取余,选出一个MesaageLooper。然后再将消息push到这个MesaageLooper的阻塞队列。

每个MesaageLooper的run方法实现的就是一个死循环,从阻塞队列中拿消息,当消息等于256时,或者阻塞超过1s就将拿到的消息合并成一个消息发送到mq。如果阻塞队列满,那么push会直接将消息发送到mq。因此,服务重启时如果使用kill 9强行结束进程,至多只会有1s的数据丢失。设置1s还有一个原因就是控制消息的实时性。

灰度上线测试一天后也证明此方案对服务的影响并不大,无论是gc还是内存占用,都看不出加了这么一层逻辑。1s的平均请求按50w计算,四台机器分担,每个服务的每秒请求数平均是2000。

为何用golang实现消费者?

然而消息的消费并不顺利。一个是因为消息消费我用了golang实现,我也是刚入门,写起代码来还感觉别扭,二是一个消息是由原本256个消息组合而成的问题。

使用golang其实是有原因的。原本计划是让消费者占用较小的内存,以实现将消费者寄生在其它服务所在的机器上,充分利用其它耗内存而cpu利用率低的服务所在的机器。同时利用docker实现快速部署,让docker 的镜像更小,不需要安装jdk什么的。还有就是利用go的协程并发处理能力吧,让消费者消费消息的速度能赶上消息的产生速度。

为入门golang买单

为了便于理解,我还是以java的线程池来说明。假设我配置的线程池线程数量是512。寄生在其它服务的机器上需要给主人点面子,不能把人家的cpu全部吃完,导致主服务不可用,所以线程的数量结合消息的消费情况综合考虑,不能超过一半的cpu使用率,而选择512这个数量。

Sqs支持一次拉取多条消息,并且有一个可见性超时的特性,当消息被消费者拉取到之后,在多长时间内未删除,下次可能还会被拉取到,或者其它消费者还能拉取到。最初我设置的可见性超时是60s。

一开始我开启5个线程拉取消息,每次最多拉取10条消息。那么很可能同一时间内会拉取到50条消息。由于一条消息是由原本256条消息合并而成的,所以512个线程同一时间段至多只能消费2条消息,而一条消息(合并后的)的消费平均耗时是10s,也就是说一分钟内最多消费12条消息,其它38条消息在一分钟后会被其它消费者拉取到,所以就会出现大量消息重复消费的情况,久而久之,消息越积累越多。

我用golang的channel实现生产者与消费者,channel的大小可设置,当channel满时,拉取到的消息是放不进channel的,因此会将拉取线程阻塞住,只有消费者从 channel取数据才能继续放入。但阻塞的那段时间要小于消息的可见性超时,因为消息只有在开始消费时我才会将其从mq中删除。

后面的改进就是根据消费能力去调整消息的拉取线程数,以及每次拉取的消息数。还有一点要注意,为保证时刻有消息准备就绪开始消费,最好不要让消息消费完再从mq中拉取。但这也会导致另一个问题,一些消息拉取到本地后,由于channel已满,放不进,而其它空闲消费节点又拉不到,导致消息被消费到的时间延长。这就需要作出取舍。

祝大家在2020年工作顺路,家庭幸福,合家团圆

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Pig4Cloud之登陆验证(二)发放token
上一篇介绍了客户端认证处理,那是令牌颁发的前提。这篇开始,我们就来研究下令牌颁发。
一觉睡到小时候
2022/12/07
1.1K0
【长文】Spring Cloud OAuth Token 生成源码解析
内容较长,spring security oauth 整个放发过程的类都有详细说明,建议大家保存后 慢慢阅读,或者当工具书查询
冷冷
2019/05/26
2.1K0
实战!Spring Boot Security+JWT前后端分离架构登录认证!
Spring security这里就不再过多介绍了,相信大家都用过,也都恐惧过,相比Shiro而言,Spring Security更加重量级,之前的SSM项目更多企业都是用的Shiro,但是Spring Boot出来之后,整合Spring Security更加方便了,用的企业也就多了。
码猿技术专栏
2024/01/29
1.1K0
实战!Spring Boot Security+JWT前后端分离架构登录认证!
认证鉴权与API权限控制在微服务架构中的设计与实现(二)
引言: 本文系《认证鉴权与API权限控制在微服务架构中的设计与实现》系列的第二篇,本文重点讲解用户身份的认证与token发放的具体实现。本文篇幅较长,对涉及到的大部分代码进行了分析,可收藏于闲暇时间阅读,欢迎订阅本系列文章。 1. 系统概览 在上一篇 《认证鉴权与API权限控制在微服务架构中的设计与实现(一)》介绍了该项目的背景以及技术调研与最后选型,并且对于最终实现的endpoint执行结果进行展示。对系统架构虽然有提到,但是并未列出详细流程图。在笔者的应用场景中,Auth系统与网关进行结合。在网关出配置
aoho求索
2018/04/03
1.7K0
认证鉴权与API权限控制在微服务架构中的设计与实现(二)
Spring Cloud OAuth2 实现用户认证及单点登录
OAuth 2 有四种授权模式,分别是授权码模式(authorization code)、简化模式(implicit)、密码模式(resource owner password credentials)、客户端模式(client credentials),具体 OAuth2 是什么,可以参考这篇文章。(http://www.ruanyifeng.com/blog/2014/05/oauth_2_0.html)
古时的风筝
2019/10/24
2K0
Spring Cloud OAuth2 实现用户认证及单点登录
聊聊 OAuth 2.0 的 Token 续期处理
若上,当 前端拿着正确的(未过期且未使用过)refresh_token 去调用 认证中心的刷新 端点刷新时,会 触发RefreshTokenGranter, 返回新的 Token
冷冷
2020/03/23
3.4K0
从零开始的Spring Security Oauth2(二)
本文开始从源码的层面,讲解一些spring Security Oauth2的认证流程。本文较长,适合在空余时间段观看。且涉及了较多的源码,非关键性代码以…代替。 获取token 上一篇博客中我们尝试使用了password模式和client模式,有一个比较关键的endpoint:/oauth/token。从这个入口开始分析,spring security oauth2内部是如何生成token的。 首先开启debug信息: logging: level: org.springframework: DEB
程序猿DD
2018/02/01
1.1K0
从零开始的Spring Security Oauth2(二)
决定放弃 JWT 了!
JWT相信大家都有所了解,一种无状态的认证方式,因为JWT本身就能存储一些非敏感的身份信息,这种方式目前也被广泛使用,在陈某之前的Spring Cloud Gateway整合Spring Security OAuth2中使用的就是JWT。
码猿技术专栏
2023/08/10
7370
决定放弃 JWT 了!
spring Cloud微服务 security+oauth2认证授权中心自定义令牌增强,并实现登录和退出
在之前的博客我写了 SpringCloud整合spring security+ oauth2+Redis实现认证授权,本文对返回的token实现自定义增强令牌返回结果,以及对于oauth2存在Redis的数据进行解释。
共饮一杯无
2022/11/28
1.2K0
spring Cloud微服务 security+oauth2认证授权中心自定义令牌增强,并实现登录和退出
SpringCloud+SpringBoot+OAuth2+Spring Security+Redis实现的微服务统一认证授权
因为目前做了一个基于 Spring Cloud 的微服务项目,所以了解到了 OAuth2,打算整合一下 OAuth2 来实现统一授权。关于 OAuth 是一个关于授权的开放网络标准,目前的版本是 2.0,这里我就不多做介绍了。
程序员小猿
2021/11/23
1.4K0
Spring Security OAuth 个性化token
如上代码,在拼装好token对象后会调用认证服务器配置TokenEnhancer( 增强器) 来对默认的token进行增强。
冷冷
2019/02/19
9220
Spring Security---Oauth2详解
在说明OAuth2需求及使用场景之前,需要先介绍一下OAuth2授权流程中的各种角色:
大忽悠爱学习
2021/12/07
4.7K0
Spring Security---Oauth2详解
SpringCloud Alibaba微服务实战十八 - Oauth2.0 自定义授权模式
那么如何新增一个自定义的授权模式,比如像下面这样根据手机号和短信验证码进行登录呢?
JAVA日知录
2020/07/31
2.6K0
SpringCloud Alibaba微服务实战十八 - Oauth2.0 自定义授权模式
实战!Spring Boot Security+JWT前后端分离架构认证登录,居然还有人不会?
前后端分离不同于传统的web服务,无法使用session,因此我们采用JWT这种无状态机制来生成token,大致的思路如下:
爱撒谎的男孩
2023/08/28
4K0
实战!Spring Boot Security+JWT前后端分离架构认证登录,居然还有人不会?
Spring security oauth2的认证流程
AuthorizationServerConfigurerAdapterm默认方法配置
用户1499526
2019/08/20
3.1K0
Spring Security Oauth2 单点登录案例实现和执行流程剖析
OAuth是一个关于授权的开放网络标准,在全世界得到的广泛的应用,目前是2.0的版本。OAuth2在“客户端”与“服务提供商”之间,设置了一个授权层(authorization layer)。“客户端”不能直接登录“服务提供商”,只能登录授权层,以此将用户与客户端分离。“客户端”登录需要获取OAuth提供的令牌,否则将提示认证失败而导致客户端无法访问服务。关于OAuth2这里就不多作介绍了,网上资料详尽。下面我们实现一个 整合 SpringBoot 、Spring Security OAuth2 来实现单点登录功能的案例并对执行流程进行详细的剖析。
朝雨忆轻尘
2019/06/19
2.8K0
Spring Security Oauth2 单点登录案例实现和执行流程剖析
【小技巧】spring security oauth2 令牌实现多终端登录状态同步
解决不同客户端使用token,各个客户端的登录状态必须保持一致,退出状态实现一致。同上述问题类似如何解决不同租户相同用户名的人员的登录状态问题。
冷冷
2019/09/23
1.9K0
【小技巧】spring security oauth2 令牌实现多终端登录状态同步
【小技巧】spring security oauth2 令牌实现多终端登录状态同步
解决不同客户端使用token,各个客户端的登录状态必须保持一致,退出状态实现一致。同上述问题类似如何解决不同租户相同用户名的人员的登录状态问题。
冷冷
2019/09/20
2.7K0
Spring Security OAuth 2.0 发放令牌接口地址自定义
OAuth 2.0 如何获取令牌 以密码模式为例,获取 Token curl --location --request POST 'http://oauth-server/oauth/token' \ --header 'Authorization: Basic dGVzdDp0ZXN0' \ --data-urlencode 'username=admin' \ --data-urlencode 'password=123456' \ --data-urlencode 'scope=server' \ -
冷冷
2020/05/18
2K0
Spring Security OAuth 2.0  发放令牌接口地址自定义
Spring Security 系列(2) —— Spring Security OAuth2
OAuth(开放授权)是一个开放标准,允许用户让第三方应用访问该用户在某一网站上存储的私密的资源(如照片,视频,联系人列表),而无需将用户名和密码提供给第三方应用。
求和小熊猫
2022/06/30
6.2K0
推荐阅读
相关推荐
Pig4Cloud之登陆验证(二)发放token
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验