前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >TDMQ RocketMQ 版订阅关系一致性原理与实践

TDMQ RocketMQ 版订阅关系一致性原理与实践

作者头像
腾讯云中间件团队
发布于 2025-04-30 02:31:24
发布于 2025-04-30 02:31:24
10900
代码可运行
举报
运行总次数:0
代码可运行
导语

腾讯云 TDMQ RocketMQ 版是基于 Apache RocketMQ 打造的满足金融级高可靠的在线业务消息队列产品,凭借其高可用、高可靠等特性,被广泛应用于金融、电商,社交等高并发场景,获得了各行各业用户的广泛认可。在实际使用中, 订阅关系不一致是开发者经常容易遇到的一个问题,可能会导致消息消费异常、消息丢失等现象。

本文将深入解析订阅关系一致性的核心要点,从定义与约束机制,到底层实现原理与优化实践,再结合真实案例分享 TDMQ RocketMQ 版针对订阅关系不一致问题的解决方案,帮助开发者快速定位问题根源,构建稳定可靠的消息系统。

订阅关系定义

订阅关系是 RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置,订阅关系由消费者组动态注册到服务端,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。

通过配置订阅关系,可控制如下消费行为:

  • 消息过滤规则:用于控制消费者在消费消息时,选择主题内的哪些消息进行消费,设置消费过滤规则可以高效地过滤消费者需要的消息集合,灵活根据不同的业务场景设置不同的消息接收范围。
  • 消费状态:RocketMQ 服务端默认提供订阅关系持久化的能力,即消费者分组在服务端注册订阅关系后,当消费者离线并再次上线后,可以获取离线前的消费进度并继续消费。

在 RocketMQ 的领域模型中,订阅关系的位置和流程如下:

  1. 消息由生产者初始化并发送到 RocketMQ 服务端。
  2. 消息按照到达 RocketMQ 服务端的顺序存储到主题的指定队列中。
  3. 消费者按照指定的订阅关系从 RocketMQ 服务端中获取消息并消费。

订阅关系一致性约束

订阅关系一致性要求同一消费者组内的所有消费者实例所订阅的主题必须和过滤规则完全一致。这里涉及三个约束条件,具体来看:

  • 消费组必须一致

对于大多数分布式应用来说,一个消费组下通常会挂载多个 Consumer 实例,订阅关系一致性的约束范围就是同一个消费组下的所有消费者。

  • 订阅的主题必须一致

同一个消费组下的所有消费者订阅的主题必须一致,例如:Consumer1 订阅 TopicA 和 TopicB,Consumer2 也必须订阅 TopicA 和 TopicB,不能只订阅 TopicA、只订阅 TopicB 或订阅 TopicA 和 TopicC。

  • 过滤规则必须一致

同一个消费组下的所有消费者过滤规则必须一致,包括 Tag 的数量和 Tag 的顺序,例如:Consumer1 订阅 TopicB 且 Tag 为 Tag1||Tag2,Consumer2 订阅 TopicB 的 Tag 也必须是 Tag1||Tag2,不能只订阅 Tag1、只订阅 Tag2 或者订阅 Tag2||Tag1。

订阅关系一致的示例

下图展示了常见的两种正确的订阅关系,分别对应两种情况:

正确示例一:单 Topic 单 Tag 订阅

如图中 Group 1 的 Consumer1 和 Consumer2 都订阅 TopicA 中所有消息。

正确示例二:单 Topic 多 Tag 订阅

如图中 Group 2 的 Consumer1 和 Consumer2 都订阅 TopicA 中 Tag 为 Tag1 或 Tag2 的消息,且顺序都是 Tag1||Tag2。

订阅关系不一致的示例

下图展示了三种典型错误的订阅关系,分别对应三种情况:

错误示例一:订阅 Topic 不同

如图中 Group 1 的 Consumer1 和 Consumer2 分别订阅了不同的 Topic。

错误示例二:订阅 Topic 相同但 Tag 不同

如图中 Group 2 的 Consumer1 订阅 TopicA 的 Tag1,Consumer2 订阅 TopicA 的 Tag2。

错误示例三:订阅 Topic 和 Tag 都相同但 Tag 顺序不同

如图中 Group 3 的 Consumer1 订阅 TopicA 的 Tag1||Tag2,Consumer2 订阅 TopicA 的 Tag2||Tag1,这里虽然订阅 Tag 都相同但顺序不同,也不符合订阅一致性约束。

订阅关系不一致的影响

如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。

如下示例,这里我们启动两个 Consumer,它们都属于消费组 Group1,都订阅了主题 TopicA,但是 Consumer1 订阅的是 Tag1 的消息,Consumer2 订阅的是 Tag2 的消息。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
String topic = "TopicA";
String consumerGroup = "Group1";
FilterExpression filterExpressionTag1 = new FilterExpression("Tag1", FilterExpressionType.TAG);
PushConsumer consumer1 = provider.newPushConsumerBuilder()
    .setConsumerGroup(consumerGroup)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpressionTag1))
    .build();
FilterExpression filterExpressionTag2 = new FilterExpression("Tag2", FilterExpressionType.TAG);
PushConsumer consumer2 = provider.newPushConsumerBuilder()
    .setConsumerGroup(consumerGroup)
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpressionTag2))
    .build();

这种情况下两个客户端分别会有什么表现呢?

  • Consumer1 消费者无法消费 Tag 值为 Tag1 的消息,因为 Consumer1 消费者在拉取消息时,服务端该消费组的订阅信息中 Tag 值为 Tag2,经过服务端过滤后,Consumer1 消费者拉取到的消息的 Tag 值都是 Tag2 , 但消费者在收到消息后也会进行过滤,这部分消息也都被过滤掉了。
  • Consumer2 消费者只能消费部分 Tag 值为 Tag2 的消息,因为只有部分队列分配给了 Consumer2。

但是在服务端同一个消费组内的各个消费者客户端的订阅信息会相互被覆盖,所以这种消费状态非常混乱,上面示例中 Consumer1 和 Consumer2 的消费情况可能也会发生切换。

订阅关系一致性原理

通过上边的示例我们可以看到,订阅关系不一致时,客户端消费逻辑是不确定的,那么这个现象是如何形成的呢?让我们通过源码来一探究竟。

注:以下涉及源码均出自 Apache RocketMQ 社区 release-5.2.0 分支。

客户端上报订阅关系

消费者启动后,会定时向所有 Broker 发送心跳包,携带订阅关系信息。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public synchronized void start() throws MQClientException {
        this.mQClientFactory.checkClientInBroker();
        if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
            this.mQClientFactory.rebalanceImmediately();
        }
    }

org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class HeartbeatData extends RemotingSerializable {
    // 消费者客户端ID
    private String clientID;
    private Set<ConsumerData> consumerDataSet = new HashSet<>();
}
public class ConsumerData {
    // 消费组ID
    private String groupName;
    // 订阅关系
    private Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
}
public class SubscriptionData implements Comparable<SubscriptionData> {
    // 订阅主题
    private String topic;
    // 过滤的 Tag 列表
    private Set<String> tagsSet = new HashSet<>();
    // 上报的时间戳(版本号)
    private long subVersion = System.currentTimeMillis();
    // 过滤类型
    private String expressionType = ExpressionType.TAG;
}

可以看到心跳包中包含了当前消费者客户端的 ID 和客户端消费信息,其中消费信息包含该消费者属于哪个消费组,还有该消费者的订阅关系列表,每个订阅关系中表明了它订阅的是哪个主题、哪种订阅类型、用来过滤消息的 Tag 列表和当前的时间戳。

服务端处理订阅关系

Broker 在接收到心跳后,会更新本地的订阅关系表。

org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor#registerConsumer

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public boolean registerConsumer(...) {
        // 获取或创建消费组信息
        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
        if (null == consumerGroupInfo) {
            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
            consumerGroupInfo = prev != null ? prev : tmp;
        }
        // 更新订阅关系
        consumerGroupInfo.updateSubscription(subList);
    }
public boolean updateSubscription(final Set<SubscriptionData> subList) {
        boolean updated = false;
        for (SubscriptionData sub : subList) {
            // 根据 Topic 查对应的订阅关系
            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
            // 订阅关系不存在,更新本订阅关系
            if (old == null) {
                this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            // 如果已存在则用新的覆盖旧的
            } else if (sub.getSubVersion() > old.getSubVersion()) {
                this.subscriptionTable.put(sub.getTopic(), sub);
            }
        }
        return updated;
    }

这里可以看到 Broker 在更新订阅关系时,同一个消费组下订阅的同一个主题的订阅关系是直接使用最新上报的关系,那么不同客户端上报的订阅关系不一致时服务端报错的订阅关系就会一直被相互覆盖,只会以最新上报的订阅关系为准。

根据订阅关系过滤消息

而消费者客户端在拉取消息时,Broker 会使用已保存的订阅关系来进行过滤。

org.apache.rocketmq.store.DefaultMessageStore#getMessage

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Overridepublic
boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    if (null == tagsCode || null == subscriptionData) {    
        return true;
    }
    if (subscriptionData.isClassFilterMode()) {    
        return true;
    }
    // 匹配逻辑
    return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)||     subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

那么当 Broker 上的订阅关系一致在变化时,过滤消息的结果就可能是不符合预期的。详细实现原理参考:《Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践》。

腾讯云优化实践

订阅关系不一致会直接导致消息消费异常,需快速定位并修复。TDMQ RocketMQ 版控制台提供可视化检测能力,无需人工逐个排查日志或配置,通过控制台 3 步即可完成问题的发现、定位、修复,降低运维复杂度。

1、一键检测

自动对比消费组内所有客户端的订阅配置,高亮显示出不一致的订阅关系。

2、精准定位

点击不一致详情,直接关联到具体客户端实例,可快速判断出非预期的订阅关系所在的实例。

3、闭环验证

修订后实时同步订阅关系一致性状态,确保消费组订阅关系符合预期。

常见问题

哪些典型场景会出现订阅关系不一致?

  • 环境未完全隔离,非生产环境和生产环境使用了相同的 Group 订阅不同的 Topic。
  • 业务代码修改了订阅关系,在灰度和版本发布过程中,新旧版本消费者共存。

总结

订阅关系一致性是 RocketMQ 消息系统消费行为正确的核心保障。本文通过订阅关系定义、一致性约束机制、原理分析与腾讯云优化实践,系统阐述了订阅关系不一致的潜在风险与解决方案:

  1. 核心约束:同一消费组内的所有消费需严格遵循主题一致、过滤规则一致(含 Tag 顺序) 的原则,任一环节的差异均可能导致漏消费消息。
  2. 腾讯云能力:依托控制台的一键检测、精准定位、闭环验证功能,开发者可快速识别异常实例并修复,将传统人工排查耗时从小时级缩短至分钟级。
  3. 最佳实践:建议通过消费组隔离、动态配置强校验、多 Topic 场景分层治理等策略,从源头规避订阅关系不一致风险。

通过本文的介绍与案例分析,相信读者对 TDMQ RocketMQ 版的订阅关系一致性机制有了更深入的理解,并能够在实际项目中灵活应用这一机制,保障消息 100% 按预期路由,避免因配置偏差导致业务消息漏消费。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-04-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云中间件 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RocketMQ为什么要保证订阅关系的一致性?
前段时间有个朋友向我提了一个问题,他说在搭建 RocketMQ 集群过程中遇到了关于消费订阅的问题,具体问题如下:
张乘辉
2019/07/30
2K0
聊聊rocketmq的订阅关系
org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
code4it
2023/05/08
4211
RocketMQ,同一个topic下是否可以通过不同的tag来进行订阅吗?
针对以上问题,有两个场景:使用阿里云的云服务器的RocketMQ和使用自己搭建的RocketMQ。但无论采用这两种的任何一种,都是可以在同一个topic下,通过tag来进行业务区分的。
程序新视界
2020/12/01
5.2K0
填坑笔记:RocketMQ消息订阅失败问题?
项目组使用阿里RocketMQ,对同一个消费组设置不同的tag订阅关系,出现消息丢失的问题,本文从rocketmq源码研究消息发布与订阅原理,并分析导致该问题的原因。
lyb-geek
2019/07/22
6K0
填坑笔记:RocketMQ消息订阅失败问题?
Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践
本文将系统阐述 Apache RocketMQ 消息过滤机制的技术架构与实践要点。首先从业务应用场景切入,解析消息过滤的核心价值;接着介绍 Apache RocketMQ 支持的两种消息过滤实现方式,帮助读者建立基础认知框架;随后深入剖析 SQL 语法过滤与标签(Tag)过滤的技术实现的核心原理以及规则限制;最后介绍腾讯云在消息过滤性能优化方面的具体实践。
腾讯云中间件团队
2025/04/04
1420
Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践
面试官:使用 RocketMQ 怎么进行灰度发布?
灰度发布是指在黑与白之间,平滑过渡的一种发布方式。在大流量的系统中,如果一次升级改造范围比较大,或者影响内容不太确定,一般会采用切量的方式进行升级,这样可以减少生产变更带来的影响。
jinjunzhu
2022/12/20
8580
面试官:使用 RocketMQ 怎么进行灰度发布?
RocketMQ消息丢失如何排查?
其实借助RocketMQ-Dashboard就能高效的排查,里面有很多你想象不到的功能
Java识堂
2022/04/06
2.5K0
RocketMQ消息丢失如何排查?
RocketMQ消费者没有成功消费消息的问题排查
今天下游同事反馈,有一些以取消的订单库存还原异常了,导致部分商品库存没有还原。查日志发现没有收到还原消息,但是查看发送方是可以确认消息是已经发了的,那么是什么原因导致消费者没有收到,或者收到后没有处理消息呢。最后发现这些消息的状态都是NOT_ONLINE,原因是服务挂了,重启之后便可以重新消费了。让我们看看这个调查过程。
翎野君
2023/05/12
5.1K0
RocketMQ消费者没有成功消费消息的问题排查
两个实验让我彻底弄懂了「订阅关系一致」
订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例所订阅的 Topic 、Tag 必须完全一致。
勇哥java实战
2023/10/11
1.4K0
【修正版】7张图带你轻松入门RocketMQ
RocketMQ 是阿里巴巴的分布式消息中间件,在 2012 年开源,在 2017 年成为 Apache 顶级项目。
jinjunzhu
2022/09/23
1.6K0
【修正版】7张图带你轻松入门RocketMQ
Docker 安装 RocketMQ 并结合 SpringBoot 使用实例
在之前的《浅入浅出消息队列》一文中,我们了解了消息队列的作用、优缺点和使用场景,相信你对消息队列已经有了一个大致的概念,文末给自己埋的坑说日后会写一篇实战教程,正好现在实习结束了,也许久没有写实战教程了,于是这就来填坑了。
出其东门
2020/11/03
1.5K0
Docker 安装 RocketMQ 并结合 SpringBoot 使用实例
RocketMQ之消费者启动与消费流程
RocketMQ是由阿里巴巴开源的分布式消息中间件,支持顺序消息、定时消息、自定义过滤器、负载均衡、pull/push消息等功能。RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。NameServer充当名字路由服务,整体架构图如下所示:
2020labs小助手
2022/07/12
1.1K0
RocketMQ系列 | 如何让消息“丢失”?
RocketMQ 5.0: 云原生“消息、事件、流”实时数据处理平台,覆盖云边端一体化数据处理场景。
烟雨平生
2023/10/01
5360
RocketMQ系列 | 如何让消息“丢失”?
RocketMQ(六):Consumer Rebalanc原理解析(运行流程、触发时机、导致的问题)
这里推荐一篇Java语法糖的文章:Java 语法糖:让开发更丝滑的“幕后操作” 文章列举常用的Java语法糖并分析优劣点,让我们的开发更加丝滑~
菜菜的后端私房菜
2024/11/11
2810
RocketMQ中msg&tag的生命周期
最近发现项目内部和外部沟通频繁使用MQ,并通过tag进行消息过滤和隔离,因此想搞清楚tag在源码中使用的地方,毕竟消息中间件这块还是有很多该学习的地方。
CBeann
2023/12/25
3250
RocketMQ中msg&tag的生命周期
RocketMQ学习-消息发布和订阅
前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。
阿杜
2018/08/06
6.1K0
RocketMQ学习-消息发布和订阅
Spring boot集成RocketMQ
之前安装好了RocketMQ,这一篇就简单记录一下Spring boot是怎么集成RocketMQ的,如果有需要安装RocketMQ的同学看这一篇,Linux在线安装RocketMQ,如果没有linux环境的同学也可以本地启动,只需要有java环境即可。
余生大大
2022/11/02
1.7K0
RabbitMQ都写了,RocketMQ怎么能落下?
最近看到了我在Github上写的rabbitmq-examples陆续被人star了,就想着写个rocketmq-examples。对rabbitmq感兴趣的小伙伴可以看我之前的文章。下面把RocketMQ的各个特性简单介绍一下,这样在用的时候心里也更有把握
Java识堂
2020/07/28
9100
RabbitMQ都写了,RocketMQ怎么能落下?
【RockerMQ】002-RocketMQ 基本概念、系统架构
消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
訾博ZiBo
2025/01/06
1430
【RockerMQ】002-RocketMQ 基本概念、系统架构
SpringBoot集成RocketMq
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。
程序员子龙
2023/11/26
5440
SpringBoot集成RocketMq
推荐阅读
相关推荐
RocketMQ为什么要保证订阅关系的一致性?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验