Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RocketMQ系列 | 容量削峰填谷后,发送的消息“少”了怎么办!!??

RocketMQ系列 | 容量削峰填谷后,发送的消息“少”了怎么办!!??

作者头像
烟雨平生
发布于 2023-10-06 12:37:55
发布于 2023-10-06 12:37:55
3470
举报
文章被收录于专栏:数字化之路数字化之路

业务背景

一个业务实体的属性出现变更,会刷新用户域、订单域、商品域等多个域冗余的数据。变更数据涉及到的数据量大时,会比较耗时、耗内存。

如果同时变更的数据较多时,就超过当前服务的容量,JVM会频繁FullGC,继而pod重启。数据变更的功能不可用了。

解决方案1:加机器。增加系统容量 解决方案2:数据源更新与刷新其它业务域的冗余数据解耦。使用MQ来异步刷新冗余的数据实现容量的削峰填谷。

最终使用了方案2,使用目前项目中使用的消息中间件RocketMQ。原因是这个场景并不高频,可能通过控制MQ消费线程数来减少对机器资源的消耗。此处设置为2

方案2上线运行一段时间后,出现一个现象:

变更的事件消息会偶发性的丢失

现象:

1、可以找到到发送成功的日志。

2、疑似丢失的消息,在用户域、商品域找到接收消息并消费成功的日志,但在订单域中没有找到接收消息的日志。

3、订单域一直在刷新冗余数据。未消费且过期的消息,会被Rocket服务端删除。

解决办法:

1、优化数据刷新的逻辑,减少对内存的消耗。 通过翻页获取数据的方式小步快走的方式小批量获取数据、刷新数据。

2、增加RocketMQ的消费线程数。从2调整为8。

事件消息会偶发性丢失的原因分析

过期清理机制引发消息丢失: 消息按照到达服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储。 但是在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储。 RocketMQ 使用存储时长作为消息存储的依据。 在存储时长范围内的消息都会被保留,无论消息是否被消费; 超过时长限制的消息则会被清理掉。 JackieTang,公众号:的数字化之路RocketMQ系列 | 如何让消息“丢失”?

RocketMQ如何判定一个消息有没有过期呢?

要讲清楚这个问题,就不得不先聊明白消费进度管理。

消费进度原理

消息位点(Offset)

RocketMQ领域中消息是按到达服务端的先后顺序存储在指定主题[Topic]的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。 任意一个消息队列在逻辑上都是无限存储,即消息位点会从0到Long.MAX无限增加。通过主题、队列和位点就可以定位任意一条消息的位置,具体关系如下图所示:

RocketMQ领域中定义队列中 最早一条消息的位点为最小消息位点(MinOffset); 最新一条消息的位点为最大消息位点(MaxOffset)。

虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限,RocketMQ会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

消费位点(ConsumerOffset)

RocketMQ领域模型为发布订阅模式,每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息。

因此,RocketMQ通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,云消息队列 RocketMQ 版会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。

当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。 如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点

那么,历史消息会保存多久呢?也就是如何判定一个消息在服务端有没有过期呢? 看情况。不同的RocketMQ服务器都会不同。以阿里的云消息队列RocketMQ版为例:

  • 5.0系列实例:
    • 最短24小时。
    • 最长720小时。
  • 4.0系列实例:
    • 标准版:存储时长为3天,超过时间将自动滚动删除。
    • 企业铂金版:存储时长为3天,若您购买实例的存储空间规格不足时,云消息队列 RocketMQ 版会按存储时间滚动删除最早的消息,此时消息的存储时长可能不足3天。

队列中消息位点MinOffset、MaxOffset和每个消费者分组的消费位点ConsumerOffset的关系如下:

  • ConsumerOffset≤MaxOffset:
    • 当消费速度和生产速度一致,且全部消息都处理完成时,最大消息位点和消费位点相同,即ConsumerOffset=MaxOffset。
    • 当消费速度较慢小于生产速度时,队列中会有部分消息未消费,此时消费位点小于最大消息位点,即ConsumerOffset<MaxOffset,两者之差就是该队列中堆积的消息量。
  • ConsumerOffset≥MinOffset:正常情况下有效的消费位点ConsumerOffset必然大于等于最小消息位点MinOffset。消费位点小于最小消息位点时是无效的,相当于消费者要消费的消息已经从队列中删除了,是无法消费到的,此时服务端会将消费位点强制纠正到合法的消息位点。

消费位点初始值

消费位点初始值指的是消费者分组[Group ID]首次启动消费者消费消息时,服务端保存的消费位点的初始值。

RocketMQ定义消费位点的初始值为消费者首次获取消息时,该时刻队列中的最大消息位点。相当于消费者将从队列中最新的消息开始消费

小结

结合消费进度管理和目前遇到的因为消费慢引发的消息丢失问题,我们来还原下消息丢失的原因:

事件消息发出后,由于订单域消费消息的速度低于生产,然后出现消息堆积。 订单服务上线新需求,老的RocketMQ消费客户端下线。 上线完成后,启动新的RocketMQ消费客户端。 新的RocketMQ消费者[Group ID]从RocketMQ Broker服务器拉取消息。 如果RocketMQ服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。 这些未消费且过期的消息,就会被删除。 从业务上看,这些消息是丢失了。

实际上,即使订单服务没有重新发布,也会出现消息丢失。 因为过期的消息已经从RocketMQ服务端自动滚动删除了

参考

消费进度管理:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/consumer-progress-management

心情点滴

中秋节好像像平时一样,准备家常便饭。 妈妈说:“大人过不过节都一样,小孩子过节要有一个过节的样。” 然后一块做了一顿丰富的饭,孩子吃的开心,感受到节日的不同。 同时自己也觉得一下子有了家的样子,有了家该有温度。一下子满足了对家的想象

有妈的日子,真好

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-10-01 20:18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 的数字化之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RocketMQ系列 | 如何让消息“丢失”?
RocketMQ 5.0: 云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。
烟雨平生
2023/10/01
5240
RocketMQ系列 | 如何让消息“丢失”?
消费者原理分析-RocketMQ知识体系4
前文了解了 RocketMQ消息存储的相关原理,本文将讲讲消息消费的过程及相关概念。
DougWang
2021/07/21
1.3K0
聊聊 RocketMQ 4.X 消费逻辑
RocketMQ 是笔者非常喜欢的消息队列,4.9.X 版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。
勇哥java实战
2023/06/05
1K0
RocketMQ详细介绍
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、延时消息、消息回溯等(业务性比较关注)
默 语
2024/11/22
5050
RocketMQ详细介绍
万字长文讲透 RocketMQ 的消费逻辑
RocketMQ 是笔者非常喜欢的消息队列,4.9.X 版本是目前使用最广泛的版本,但它的消费逻辑相对较重,很多同学学习起来没有头绪。
勇哥java实战
2023/09/04
1.4K0
万字长文讲透 RocketMQ 的消费逻辑
RocketMQ如何保证消息的可靠性投递?
producer向broker发送消息后,没有收到broker的ack时,rocketmq会自动重试。重试的次数可以设置,默认为2次
Java识堂
2021/05/06
3.3K1
云原生中间件RocketMQ-消费者消费模式之广播模式、偏移量offset解析
广播消费: 当使用广播消费模式时, 消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端, 保证消息至少被每台机器消费一次。
共饮一杯无
2022/11/28
1.5K0
云原生中间件RocketMQ-消费者消费模式之广播模式、偏移量offset解析
消息中间件—RocketMQ消息消费(二)(push模式实现)
摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下RocketMQ中“Pull和Push两种消费方式的简要流程”以及“Push消费方式的启动流程”(ps:如果不熟悉这几块内容的童鞋,可以自己回顾下上一篇的内容)。本文将详细介绍RocketMQ中Push消费方式下的“Pull消息的长轮询机制”和“Consumer端的负载均衡机制”这两块关键核心内容。 由于RocketMQ系列的技术分享存在一定的连续性,因此希望读者能回顾下往期RocketMQ分享的篇幅: (1)消息中间件—RocketMQ的RPC通信(一) (2)消息中间件—RocketMQ的RPC通信(二) (3)消息中间件—RocketMQ消息发送 (4)消息中间件—RocketMQ消息消费(一)
用户2991389
2018/09/05
2K0
消息中间件—RocketMQ消息消费(二)(push模式实现)
RocketMQ 简介
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
程序员果果
2021/01/06
2.7K0
RocketMQ 简介
深入分析 RocketMQ 的 Push 消费方式实现
RocketMQ 是阿里巴巴旗下一款开源的 MQ 框架,经历过双十一考验,由 Java 编程语言实现,有非常完整的生态系统。RocketMQ 作为一款纯 Java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
政采云前端团队
2023/11/09
1.5K0
深入分析 RocketMQ 的 Push 消费方式实现
3分钟白话RocketMQ系列—— 如何消费消息
前面已经介绍了 生产消息、存储消息 两大块内容,那接下来,我们白话一下RocketMQ是如何消费消息的,揭秘消息消费全过程。
阿丸笔记
2023/10/22
1.4K0
3分钟白话RocketMQ系列—— 如何消费消息
面试系列之-rocketmq消息机制
广播消费模式下,相同Consumer Group的每个Consumer实例都接收同一个Topic的全量消息。即每条消息都会被发送到Consumer Group中的每个Consumer进行消费;
用户4283147
2022/12/29
1.6K0
面试系列之-rocketmq消息机制
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
一个新的消费组订阅一个已存在的Topic主题时,消费组是从该Topic的哪条消息开始消费呢?
丁威
2019/07/23
6.8K0
RocketMQ实战:一个新的消费组初次启动时从何处开始消费呢?
RocketMq 基本扫盲
本篇是RocketMq扫盲,并不会讲到各个组件实现的细节内容,这里从整体全局的角度看关于RocketMq的整体设计。
阿东
2022/09/11
8430
RocketMq 基本扫盲
消息队列| RocketMQ 核心原理
a. fork下来,起一个demo,上一个测试环境,遇到问题再去社区提问或找些实践文章;
heidsoft
2019/06/15
3.6K0
「查缺补漏」巩固你的RocketMQ知识体系
Windows安装部署 下载 地址:[https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.5.2/rocketmq-all-4.5.2-bin-
Kerwin
2020/08/26
4270
RocketMq之Consumer原理浅析
当一个业务系统部署多台机器时,每台机器都启动了一个Consumer,并且这些Consumer都在同一个ConsumerGroup也就是消费组中,此时一个消费组中多个Consumer消费一个Topic,而一个Topic中会有多个MessageQueue。 那么就会有一个问题,比如有2个Consumer,3个MessageQueue,那么这3个MessageQueue怎么分配呢?这就涉及到Consumer的负载均衡了。 首先 Consumer 在启动时,会把自己注册给所有 Broker ,并保持心跳,让每一个 Broker 都知道消费组中有哪些 Consumer 。 然后 Consumer 在消费时,会随机链接一台 Broker ,获取消费组中的所有 Consumer 。 主要流程如下:
周同学
2020/06/16
1.9K1
RocketMQ消息存储
MQ Push一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。如果没有标记为消费,MQ会不断的尝试往消费者推送这条消息。
Java廖志伟
2022/03/07
6950
RocketMQ消息存储
大写的服,看完这篇你还不懂RocketMQ算我输
Apache RocketMQ 是一款 低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
猿天地
2020/09/22
7000
大写的服,看完这篇你还不懂RocketMQ算我输
软件架构-rocketmq之初识消息中间件
1.Provider提供方:服务提供者。2.Producer生产者:创建和发送JMS消息的客户端。3.Consumer消费者:接收JMS消息的客户端。4.Client客户端:生产或消费消息的应用&进程。5.Message消息:服务端与客户端之间的传输数据对象。6.Queue队列 :包含待读取消息的准备区域(点对点)。7.Topic主题:发布消息的分布机制(发布&订阅)。
IT架构圈
2021/10/11
6390
相关推荐
RocketMQ系列 | 如何让消息“丢失”?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档