东哥今天来聊聊在 RocketMQ 上遇到的坑,这次碰到的是消息堆积问题!消息疯狂堆积,一开始还挺淡定,心想不就是“几条消息”吗?
结果生产者那边嗖嗖地往里塞,消费者这边像个蜗牛,压根追不上节奏,愣是给挤成了“数据沙丁鱼罐头”。这就不简单了,得好好分析一波。
一、初探消息堆积问题
首先,消息堆积这种情况一般就两种原因:要么是消费者速度慢,要么是生产者速度太快。
这一检查发现,生产者和消费者的应用和服务器状态都挺正常,网络也没啥延迟。但既然都正常,凭什么消息还堵在这儿呢?
我立马开启排查模式,观察 RocketMQ 消费者端的各项指标,发现有个奇怪的现象:某几个消费实例的 ClientId 竟然相同!这种情况放在别的场景里可能没啥,但在 RocketMQ 中,这可绝对是个灾难源头。
二、问题深挖:ClientId 是如何引发灾难的?
接着往下分析,我怀疑是 ClientId 重复 导致了问题的爆发。为啥这么想?因为 RocketMQ 的消费者在做负载均衡时 是根据 ClientId 分配消费任务的。
多个消费者要是 ID 重复,就相当于“共享”了同一个身份,等于是几个人都拿同一张身份证,那任务分配不出问题才怪呢!
1. Docker 的网络模式暗藏玄机
在查明原因的过程中,我还发现,这个 ClientId 重复的锅,竟然得 Docker 来背。RocketMQ 消费者跑在 Docker 的 Host 模式下,导致每个消费者的 IP 地址都变成了宿主机的 IP。这一来,RocketMQ 默认生成的 ClientId 便重复了,因为它的生成规则是 IP 地址 + 进程号。同一个 IP 配合不同的进程,生成出来的 ClientId 就重复,RocketMQ 的负载均衡立马乱套了。
2. 源码里找答案
毕竟技术功底摆在这儿,自然是要从 RocketMQ 源码里找答案。打开源码,仔细分析一波,果然验证了 ClientId 的生成规则是 ip:port。这解释了为何同一个宿主机的 IP 导致多个消费者 ClientId 相同。
public String buildClientId() {
return localAddress.getHostAddress() + "@" + pid;
}
RocketMQ 负载均衡机制的确依赖于 ClientId,而这段代码生成的 ClientId 就是 ip:port 形式。多个容器在 Host 网络模式下启动,IP 地址一致,加上默认端口和进程号,导致了不同容器 ClientId 的重叠。
三、分析 RocketMQ 负载均衡机制 🤹
RocketMQ 的负载均衡策略在消费端主要通过分区均衡实现。简单来说,RocketMQ 会把所有消息队列分给每个 ClientId。如果多个 ClientId 相同的消费者存在,队列分配的机制就会把一个队列的消息分发给多个消费端,而这些消费者因为“身份重叠”就只能接收到部分消息,无法发挥全部消费能力。
代码上简单展示一下负载均衡的逻辑:
public void doRebalance() {
// 基于当前消费者的 ClientId 列表计算消息分配
for (MessageQueue mq : mqAll) {
String clientId = selectClientForQueue(mq);
allocateQueue(clientId, mq);
}
}
这里的 selectClientForQueue(mq) 就是根据 ClientId 来进行分配,一旦 ClientId 重复,这个逻辑就会出错,分配的队列资源都乱了套,导致了消费者端的消费性能大大下降。
四、解决方案出炉 🧰
既然找到原因了,解决问题的思路也就清晰了。立马采取了两项措施:
自定义设置唯一的 ClientId:在代码中为每个消费者实例添加独立的标识,确保每个 ClientId 唯一不重复,RocketMQ 的队列分配终于不再迷糊,负载均衡正常运转了。
提高消费者的消费速度:通过优化消费者端的配置,让消费处理速度提升,比如调整消息批量消费的数量参数等。这样一来,消息堆积的问题逐步得到缓解。
五、新问题:MongoDB 写入压力大报警
事情往往不是一件一件地来,而是一连串连锁反应。消息堆积问题刚解决,东哥那边的 MongoDB 又开始写入压力报警。这是因为消费速度提高了,写入操作一下子涌向 MongoDB 造成的。
解决 MongoDB 的写入压力
为了应对 MongoDB 的写入压力,东哥采取了重置消费点位的方式,先暂停部分消费,再清理掉一部分历史数据。这样 MongoDB 的写入压力瞬间减少,消费系统得以正常运转。
六、总结分析 🧠
通过这次的“堆积灾难处理大作战”,我们也能得出一些经验:
ClientId 的生成规则:RocketMQ 默认是 客户端 IP + 进程号 的组合。在 Docker 的 Host 模式下,同一宿主机的 IP 会导致 ClientId 重复问题,影响负载均衡机制。
RocketMQ 负载均衡机制的特点:消费者负载均衡高度依赖 ClientId,ClientId 一旦重复会引发队列分配错乱,直接导致消息堆积。
消息堆积的应对方法:在确保 ClientId 唯一性后,还可以适当提高消费者的消费速度,通过批量消费和其他参数调整来减轻消息堆积的情况。
系统联动性影响:消息队列堆积处理时,要谨防对后续业务系统产生连锁影响,比如这里 MongoDB 的写入压力问题,处理上也得小心翼翼。
好了,这次总算是“战胜了”消息堆积,但显然每个系统环节都有可能牵一发而动全身。
以后再遇到消息堆积问题,大家不妨优先检查 ClientId 是否唯一,接着再优化消费速度,避免重蹈覆辙了。
领取专属 10元无门槛券
私享最新 技术干货