前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RocketMQ 在使用上的一些排坑和优化

RocketMQ 在使用上的一些排坑和优化

作者头像
挖坑的张师傅
发布于 2022-05-13 08:23:22
发布于 2022-05-13 08:23:22
1.3K00
代码可运行
举报
文章被收录于专栏:张师傅的博客张师傅的博客
运行总次数:0
代码可运行

RocketMQ 在我们的项目中使用非常广泛,在使用的过程中,也遇到了很多的问题。比如没有多环境的隔离,在多个版本同时开发送测的情况下,互相干扰严重。RocketMQ 的投递可能会失败,导致丢失消息。另外开源版本的 RocketMQ 不支持任意时间精度的延时消息,仅支持特定的 level。在使用的过程中,我们做了一些针对性的优化,整理出了这篇文章。

通过阅读这篇文章,你会了解到这些知识

  • RocketMQ 多环境隔离方案尝试
  • 基于 RocksDB 的消息“可靠”投递方案
  • 基于 RocksDB 和 RocketMQ 实现任意延时的时延消息

RocketMQ 多环境隔离

因为我们有很多功能需求会并行开发和送测,开发和测试的环境各有三四套之多,假设现在我们有三个版本在同时开发,对于同一个 topic,dev1 开发环境产生的消息可能会被 dev3 开发环境消费,这两个环境消费端的代码可能不一致,造成没有办法完成这部分功能的测试,这种情况下,开发人员苦不堪言,经常需要去下线掉其它环境的消费端才能继续进行开发测试,如下图所示。

为了解决这个问题,一开始是想在 topic 上下功夫,通过修改 Producer 端,让每个环境的 topic 统一加一个环境后缀,这样 topic_ABC 在 dev1 环境就会变为 topic_ABC_dev1。这种方式理论上也可以解决,只是需要创建较多 topic,代价比较高,改动量大。

后面采用的方案是给每个环境分配独立的 RocketMQ 队列来实现,下面为了讲述的简单起见,这里只给每个环境分配一个队列,如下所示。

通过环境变量的区分

  • 在生产端:dev1 环境投递到 RocketMQ 第 0 号队列,dev2 环境投递到第 1 号队列,后面以此类推
  • 在消费端:dev1 环境只拉取 RocketMQ 第 0 号队列的消息,dev2 环境只第 1 号队列的消息,后面以此类推

生产端实现

RocketMQ 的消息投递提供了 MessageQueueSelector 接口可以自定义消息队列选择器,指定消息要投递的 queue,它的定义如下所示。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

其中 mqs 参数是当前 topic 的所有可用队列,返回值是此次要投递的 queue。它有下面这个几个实现类:

  • SelectMessageQueueByHash:使用 msg 参数的 hashcode 的绝对值与 queue 大小取模
  • SelectMessageQueueByRandom:调用 Random.nextInt 方法获取一个 0~mqs.size()-1 区间的随机数
  • SelectMessageQueueByMachineRoom:实现为空

对于我们的场景,这里简化处理,根据环境的编号直接映射 queue,生产端的示例代码如下所示。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DefaultMQProducer producer = // ...;

final int envIndex = getEnvIndex();
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return mqs.get(envIndex-1);
    }
}, envIndex);

这样 dev1 环境映射到第 0 个队列,dev3 环境映射到第 2 个队列。

消费端实现

对于消费端,RocketMQ 定义了 AllocateMessageQueueStrategy 策略接口,可以自己实现当前消费者可以消费哪些 queue 队列。AllocateMessageQueueStrategy 接口的定义如下所示。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface AllocateMessageQueueStrategy {

    /**
     * Allocating by consumer id
     *
     * @param consumerGroup 当前 consumer group
     * @param currentCID 当前 consumer id
     * @param mqAll 当前 topic 的所有 queue 列表
     * @param cidAll 当前 consumer group 下所有的 consumer id set 集合
     * @return 根据策略给当前 consumer 分配的 queue 列表
     */
    List<MessageQueue> allocate(
        final String consumerGroup,
        final String currentCID,
        final List<MessageQueue> mqAll,
        final List<String> cidAll
    );

    /**
     * 策略算法名
     */
    String getName();
}

RocketMQ 内置提供了下面这些分配算法

  • AllocateMessageQueueAveragely:平均分配算法
  • AllocateMessageQueueAveragelyByCircle:按照 queue 队列组成的环形逐个分配
  • AllocateMachineRoomNearby:基于机房临近原则算法
  • AllocateMessageQueueByMachineRoom:基于机房分配算法
  • AllocateMessageQueueConsistentHash:基于一致性 hash 算法,将 consumer 消费者作为 Node 节点 hash 到一个虚拟环上
  • AllocateMessageQueueByConfig:基于配置分配算法,没有什么作用,可以作为 example 扩展

对于我们的场景,这里简化处理,根据环境的编号直接映射 queue,消费端的代码如下所示。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MQConstant.MQ_CONSUMER_GROUP_NAME, null,
        new AllocateMessageQueueStrategy() {
            @Override
            public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
                List<MessageQueue> list = new ArrayList<>();
                list.add(mqAll.get(envIndex-1));
                return list;
            }

            @Override
            public String getName() {
                return "env-based";
            }
        });

利弊分析

这种方式的实现非常简单,客户端改动量非常小,不用修改 topic。如果你的环境数量比较固定,可以修改上面的策略,让一个环境可以使用固定的多个 queue,只要保证多个环境不使用同一个 queue 接口即可。如果开发测试环境的消息数量不多,用一个队列也问题不大。线上生产环境多机房、多环境也可以用类似的思路去实现。

到这里多环境隔离的介绍就告一段落。

消息丢失之伤

RocketMQ 本身是一个服务端,当然就会有服务不可用、服务繁忙等问题,尤其是我们的公司所有的业务共用一个 RocketMQ,时不时会出现 "system busy , start flow control for a while" 等投递异常问题。

为了解决投递可靠性的问题,一开始是想在投递异常的时候将消息写入到数据库等持久化存储中,然后有一个定时任务去补偿消费。这种方案看起来是比较完美的,但是当 RocketMQ 整体不可用,大量的消息都投递失败时,数据库的瞬间写入压力会非常大,这种方案没有被采用。

后面想到了使用 RocksDB 来曲线救国,

主角 RocksDB

RocksDB 是 Facebook 基于 Google Jeff Dean 写的 LevelDB 改进的一种嵌入式 key-value 存储系统,做了许多优化,性能相对 LevelDB 有了很大的提升,大名鼎鼎的 TiDB 底层的存储引擎就是使用的 RocksDB。

RocksDB 是一个基于 LSM 树的存储引擎,LSM 是 Log-structured merge-tree 的缩写,关于 RocksDB 的底层原理,这篇文章不展开说明,有机会我会详细写一下。

基于 RocksDB 的重试机制

核心的逻辑是投递失败以后,将消息写入到本地 RocksDB 存储中,然后有一个线程去轮询是否有消息,如果有则进行重试,如果再次投递失败会重新将消息写入到 RocksDB,过程如下图所示。

rocksdb retry

在实现上,写入 RocksDB 的 key 采用了如下的格式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
expireTime:retryCount:typeName:uuid

其中 expireTime 的生成逻辑为当前时间戳(到秒)+ 投递延迟时间,代码如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val RETRY_TIME_STEP_ARRAY = arrayOf(
    3, 5, 30, 60, 120, 300, 480, 600, 900, 1800
)

val expire = System.currentTimeMillis() / 1000 + (RETRY_TIME_STEP_ARRAY.getOrNull(retryCount) ?: 10)

当消息投递到 MQ 失败时,将其写入到 RocksDB,这部分代码如下所示。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private fun insert(msg: ByteArray, retryCount: Int, typeName: String) {
    val key = genKey(retryCount, typeName)
    rocksDB.put(mqRetryCFHandler, WRITE_OPTIONS_SYNC, key.toByteArray(), msg)
}

master 线程负责轮询 RocksDB,如果有记录将其查出来放入一个 blockingQueue 中,master 线程核心逻辑如下所示。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private var lastSeekTime: Long = 0 // 单调递增的值,初始值为当前时间戳(到秒)

private fun loop() {
    val now = // 当前时间戳,到秒
    if (lastSeekTime > now) { // 如果时钟回拨或者还没到处理时间片,睡眠一段时间
        TimeUnit.MILLISECONDS.sleep(400)
        return
    }

    rocksDB.newIterator(mqRetryCFHandler, READ_OPTIONS).use {
        it.seek("$lastSeekTime".toByteArray()) // seek 到以 lastSeekTime 开头的 key 的地方
        while (it.isValid) {
            val value = it.value()
            blockingQueue.put(String(it.key()) to value) // 放入一个固定大小的阻塞队列中
            it.next()
        }
    }
    ++lastSeekTime
}

worker 线程负责消息的重新投递,代码如下所示。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private fun startConsume() {
    repeat(THREAD_NUM) {
        thread {
            while (true) {
                val list = drain() // 批量从 blockingQueue 中取数据
                list.forEach {
                    try {
                        val typeName = getTypeName(it.first)
                        val handler = getHandler(typeName) ?: return@forEach
                        val success = handler.handler(it.second)
                        // 如果不成功,则重新写入 RocksDB
                        if (!success) {
                            val currentRetryCount = getRetryCountFromKey(it.first) + 1
                            val maxRetryCount = handler.retryCount
                            if (currentRetryCount >= RETRY_TIME_STEP_ARRAY_SIZE || currentRetryCount >= maxRetryCount) {
                                val msgString = getStringFromBytes(it.second)
                                logger.info("send reach limit, retry count:$currentRetryCount,default count:$RETRY_TIME_STEP_ARRAY_SIZE,custom count:$maxRetryCount, msg: $msgString")
                                exceptionHandle.handler("retry $currentRetryCount fail,msg:$msgString")
                                return@forEach
                            }
                            insert(it.second, currentRetryCount, typeName)
                        }
                    } catch (ex: Throwable) {
                        exceptionHandle.handler("key: $it.first ,error: ${ex.message}")
                        Thread.sleep(30)
                    }
                }
            }
        }
    }
}

通过上面的这几步改造,在过去大半年内成功的躲过了好几次 RocketMQ 的短时间故障,消息没有丢失,全部重试成功,没有造成数据的异常。

利弊分析

这个方案的优点是很轻量化,写入读取本地 RocksDB 速度都极快,在极端场景下性能几乎没有影响。但也有一个缺点需要考虑,因为没有落地到集中式存储比如 MySQL,如果项目部署到 Docker 容器中,容器重启以后,这部分重试的数据还是会丢失。使用这种方案没有办法保证百分百不丢数据,考虑到 mq 故障发生的并不频繁,在性能和丢数据中取得一个平衡也是一种可行的措施。

基于 RocksDB 的任意延时消息设计

在做完上面的“可靠投递”方案以后,衍生出另外一个解决方案,使用 RocksDB 来实现任意时延的延时消息队列,它的设计目标有三个:

  • 支持任意时延
  • 充分利用现有的基础设施
  • 需要能无限堆积,写入查询效率要求要高

于是基于 RocksDB,我们实现了一个内部称为 Rock-DMQ 的项目,名字来源是 RocksDB for Delay MQ。它的实现原理也非常简单,如下图所示。

在投递一个延时消息时,以 topic 为 "cancel_order" 为例,整个延时消息的实现逻辑如下所示。

1、通过修改 Producer 端,实际投递到 RocketMQ 的 topic 不是这个,而是替换为了一个统一的 topic,名为 dmq_inner_topic,原始 topic 被转为 body 的一部分。

2、Rock-DMQ 项目会消费 dmq_inner_topic 这个特殊的 topic

3、消费 dmq_inner_topic 的消息后,Rock-DMQ 项目会将其写入到本地的 RocksDB 中,key 为到期时间为前缀(这一点比较重要)

4、Rock-DMQ 项目采用文中第二部分的内容相似的实现方式,隔一段时间去轮询 RocksDB ,看有无到期的消息

5、如果有到期消息,Rock-DMQ 项目将这个消息投递到 RocketMQ 中

6、订阅了这个 topic 的原有消费端就可以消费到这条消息了

通过这种实现,可以实现支持任意秒数的时延消息,也比较好的复用了现有的技术组件,对 RocketMQ 本身无任何改动,在水平扩展性上也得到了比较好的支持。

核心代码在第二部分已经介绍,这里不再赘述,如有人感兴趣,后面可以考虑把完整的源码放出来。

小结

以上就是 RocketMQ 在我们这边的落地实践和填坑记录,这些方案都还在快速迭代优化中,如果你有更好的想法,可以一起沟通交流。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 张师傅的博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
Code-Breaking中的两个Python沙箱
这是发表在跳跳糖上的文章https://www.tttang.com/archive/1294/,如需转载,请联系跳跳糖。
phith0n
2023/10/29
3380
Code-Breaking中的两个Python沙箱
巅峰极客及DASCTF7月赛复现
这里的话是传入了两个参数,然后将其赋值到一个类中,进行了序列化同时用了b函数进行处理,看一下有关类的
用户9691112
2023/09/04
3070
巅峰极客及DASCTF7月赛复现
code-breaking2018
这题十分简短精悍,应该是需要找到一个在[a-z0-9_]之外的字符放置在函数前而不影响函数的调用,简单传入:
HhhM
2022/08/10
1.4K0
code-breaking2018
SEKAICTF 2022 Web Writeup
Bottle Poem Come and read poems in the bottle. No bruteforcing is required to solve this challenge
ek1ng
2023/01/02
1.2K0
SEKAICTF 2022 Web Writeup
2022蓝帽杯wep-WP
web一天白打工,这次的蓝帽几乎就是取证大爹们的主场,web题一共只有两道,一道题简单的fastjson1.2.62反序列化加一道读内存和pickle反序列化,思路都不复杂,但属实是被环境问题整麻了…不管怎么说还是记录一下吧
h0cksr
2023/05/18
3410
django 1.8 官方文档翻译: 13-9-1 如何使用会话
Django 提供对匿名会话的完全支持。其会话框架让你根据各个站点的访问者存储和访问任意数据。它在服务器端存储数据并抽象Cookie 的发送和接收。Cookie 包含会话的ID —— 不是数据本身(除非你使用基于Cookie 的后端)。
ApacheCN_飞龙
2022/11/27
1.3K0
【Python全栈100天学习笔记】Day45 Cookie和Session介绍及使用
如今,一个网站如果不通过某种方式记住你是谁以及你之前在网站的活动情况,失去的就是网站的可用性和便利性,继而很有可能导致网站用户的流式,所以记住一个用户(更专业的说法叫用户跟踪)对绝大多数Web应用来说都是必需的功能。
天道Vax的时间宝藏
2022/04/02
9170
【Python全栈100天学习笔记】Day45 Cookie和Session介绍及使用
安全研究 | 从MicroStrategy入手发现Facebook的XSS漏洞
该篇文章讲述了作者围绕Facebook采用的第三方应用MicroStrategy Web SDK,经源码分析发现存在于Facebook网站中的两个反射型漏洞(rXSS),在前一篇文章中,作者就已经在MicroStrategy身上发现了SSRF漏洞收获了$30000的奖励。这里我们继续来看看他发现rXSS漏洞的过程。
FB客服
2020/08/07
1.2K0
安全研究 | 从MicroStrategy入手发现Facebook的XSS漏洞
Shiro反序列化漏洞利用汇总
“ Apache Shiro是一个强大易用的Java安全框架,提供了认证、授权、加密和会话管理等功能。Shiro框架直观、易用,同时也能提供健壮的安全性。”
Bypass
2020/07/07
10.8K0
Shiro反序列化漏洞利用汇总
【漏洞复现】Apache Shiro 反序列化漏洞
shiro漏洞已经曝光很久了,一直没有整理思路与详细步骤,最近在学习java的反序列化,复现该漏洞来方便之后的学习
李鹏华
2024/03/12
1.9K0
【漏洞复现】Apache Shiro 反序列化漏洞
flask session 安全问题 和 python 格式化字符串漏洞
flask 是非常轻量级的 Web框架 其 session 存储在客户端中(可以通过HTTP请求头Cookie字段的session获取)
中龙技术
2022/09/28
1.1K0
Apache Shiro反序列化漏洞-Shiro-550复现总结
最近一直在整理笔记,恰好碰到实习时遇到的Shiro反序列化漏洞,本着温故而知新的思想,就照着前辈们的文章好好研究了下,整理整理笔记并发个文章。新人初次投稿,文章有啥问题的话,还望各位大佬多多包含。
FB客服
2021/10/21
2.5K0
CVE-2023-27524:Apache Superset未授权访问漏洞
Apache Superset是一个开源的数据可视化和数据探测平台,它基于Python构建,使用了一些类似于Django和Flask的Python web框架。提供了一个用户友好的界面,可以轻松地创建和共享仪表板、查询和可视化数据,也可以集成到其他应用程序中。
Timeline Sec
2023/08/22
2K0
CVE-2023-27524:Apache Superset未授权访问漏洞
渗透测试 Java架构执行漏洞检测
近期对平台安全渗透测试中遇到有JAVA+mysql架构的网站,针对此架构我们Sine安全渗透工程师整理了下具体的漏洞检测方法和防护修复方法,很多像执行框架漏洞获取到系统权限,以及跨权限写入木马后门等等操作,希望大家在渗透测试的道路中发现更多的知识和经验。
网站安全专家
2019/10/25
1.4K0
渗透测试 Java架构执行漏洞检测
强网杯 2022 Web writeup
cookie中有序列化的userfile字段来表示用户已经上传的文件,那应该要先想办法通过文件读取的功能读取到源代码,然后再考虑如何结合反序列化实现RCE。
ek1ng
2022/09/23
8380
强网杯 2022 Web writeup
Python 反序列化浅析
文章首发于跳跳糖社区https://tttang.com/archive/1782/
用户9691112
2023/05/18
8000
Django(34)Django操作session(超详细)[通俗易懂]
session: session和cookie的作用有点类似,都是为了存储用户相关的信息。不同的是,cookie是存储在本地浏览器,session是一个思路、一个概念、一个服务器存储授权信息的解决方案,不同的服务器,不同的框架,不同的语言有不同的实现。虽然实现不一样,但是他们的目的都是服务器为了方便存储数据的。session的出现,是为了解决cookie存储数据不安全的问题的。
全栈程序员站长
2022/09/20
6.7K0
Django(34)Django操作session(超详细)[通俗易懂]
Cookie、Session
无状态的意思是每次请求都是独立的,它的执行情况和结果与前面的请求和之后的请求都无直接关系,它不会受前面的请求响应情况直接影响,也不会直接影响后面的请求响应情况。
py3study
2020/01/19
1.1K0
Cookie、Session
反序列化漏洞原理剖析:从攻击到防御
在安全测试中,反序列化漏洞(Deserialization Vulnerability)因其高危害性和隐蔽性,成为近年来攻击者利用的最为频繁的漏洞类型之一,log4j2、fastjson等知名应用的漏洞中都不乏它的身影。本文将从原理、危害、利用方式及防御措施等方面,带大家详细了解这一漏洞。
星尘安全
2025/04/09
4720
反序列化漏洞原理剖析:从攻击到防御
看我如何发现价值三千美金的Facebook视频缩略图信息泄露漏洞
本文我要分享的是我的一个$3000美金Facebook漏洞发现过程。在我决定对Facebook网站进行安全测试之后,我熟读了很多相关的漏洞发现writeup,发现Facebook对有效漏洞的赏金程度还算可观,于是乎,我就给自己制订了几个相关的Facebook网站目标,看看能否在其中发现一些有意思的问题,挖掘出实质性的漏洞来。
FB客服
2018/07/30
8590
看我如何发现价值三千美金的Facebook视频缩略图信息泄露漏洞
推荐阅读
相关推荐
Code-Breaking中的两个Python沙箱
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档