腾讯云 TDMQ RocketMQ 版是基于 Apache RocketMQ 打造的满足金融级高可靠的在线业务消息队列产品,凭借其高可用、高可靠等特性,被广泛应用于金融、电商,社交等高并发场景,获得了各行各业用户的广泛认可。在实际使用中, 订阅关系不一致是开发者经常容易遇到的一个问题,可能会导致消息消费异常、消息丢失等现象。
本文将深入解析订阅关系一致性的核心要点,从定义与约束机制,到底层实现原理与优化实践,再结合真实案例分享 TDMQ RocketMQ 版针对订阅关系不一致问题的解决方案,帮助开发者快速定位问题根源,构建稳定可靠的消息系统。
订阅关系定义
订阅关系是 RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置,订阅关系由消费者组动态注册到服务端,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
通过配置订阅关系,可控制如下消费行为:
在 RocketMQ 的领域模型中,订阅关系的位置和流程如下:
订阅关系一致性约束
订阅关系一致性要求同一消费者组内的所有消费者实例所订阅的主题必须和过滤规则完全一致。这里涉及三个约束条件,具体来看:
对于大多数分布式应用来说,一个消费组下通常会挂载多个 Consumer 实例,订阅关系一致性的约束范围就是同一个消费组下的所有消费者。
同一个消费组下的所有消费者订阅的主题必须一致,例如:Consumer1 订阅 TopicA 和 TopicB,Consumer2 也必须订阅 TopicA 和 TopicB,不能只订阅 TopicA、只订阅 TopicB 或订阅 TopicA 和 TopicC。
同一个消费组下的所有消费者过滤规则必须一致,包括 Tag 的数量和 Tag 的顺序,例如:Consumer1 订阅 TopicB 且 Tag 为 Tag1||Tag2,Consumer2 订阅 TopicB 的 Tag 也必须是 Tag1||Tag2,不能只订阅 Tag1、只订阅 Tag2 或者订阅 Tag2||Tag1。
订阅关系一致的示例
下图展示了常见的两种正确的订阅关系,分别对应两种情况:
正确示例一:单 Topic 单 Tag 订阅
如图中 Group 1 的 Consumer1 和 Consumer2 都订阅 TopicA 中所有消息。
正确示例二:单 Topic 多 Tag 订阅
如图中 Group 2 的 Consumer1 和 Consumer2 都订阅 TopicA 中 Tag 为 Tag1 或 Tag2 的消息,且顺序都是 Tag1||Tag2。
订阅关系不一致的示例
下图展示了三种典型错误的订阅关系,分别对应三种情况:
错误示例一:订阅 Topic 不同
如图中 Group 1 的 Consumer1 和 Consumer2 分别订阅了不同的 Topic。
错误示例二:订阅 Topic 相同但 Tag 不同
如图中 Group 2 的 Consumer1 订阅 TopicA 的 Tag1,Consumer2 订阅 TopicA 的 Tag2。
错误示例三:订阅 Topic 和 Tag 都相同但 Tag 顺序不同
如图中 Group 3 的 Consumer1 订阅 TopicA 的 Tag1||Tag2,Consumer2 订阅 TopicA 的 Tag2||Tag1,这里虽然订阅 Tag 都相同但顺序不同,也不符合订阅一致性约束。
订阅关系不一致的影响
如果订阅关系不一致,可能导致消息消费逻辑混乱,消息被重复消费或遗漏。
如下示例,这里我们启动两个 Consumer,它们都属于消费组 Group1,都订阅了主题 TopicA,但是 Consumer1 订阅的是 Tag1 的消息,Consumer2 订阅的是 Tag2 的消息。
String topic = "TopicA";
String consumerGroup = "Group1";
FilterExpression filterExpressionTag1 = new FilterExpression("Tag1", FilterExpressionType.TAG);
PushConsumer consumer1 = provider.newPushConsumerBuilder()
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpressionTag1))
.build();
FilterExpression filterExpressionTag2 = new FilterExpression("Tag2", FilterExpressionType.TAG);
PushConsumer consumer2 = provider.newPushConsumerBuilder()
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpressionTag2))
.build();
这种情况下两个客户端分别会有什么表现呢?
但是在服务端同一个消费组内的各个消费者客户端的订阅信息会相互被覆盖,所以这种消费状态非常混乱,上面示例中 Consumer1 和 Consumer2 的消费情况可能也会发生切换。
订阅关系一致性原理
通过上边的示例我们可以看到,订阅关系不一致时,客户端消费逻辑是不确定的,那么这个现象是如何形成的呢?让我们通过源码来一探究竟。
注:以下涉及源码均出自 Apache RocketMQ 社区 release-5.2.0 分支。
客户端上报订阅关系
消费者启动后,会定时向所有 Broker 发送心跳包,携带订阅关系信息。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
this.mQClientFactory.checkClientInBroker();
if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
this.mQClientFactory.rebalanceImmediately();
}
}
org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData
public class HeartbeatData extends RemotingSerializable {
// 消费者客户端ID
private String clientID;
private Set<ConsumerData> consumerDataSet = new HashSet<>();
}
public class ConsumerData {
// 消费组ID
private String groupName;
// 订阅关系
private Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
}
public class SubscriptionData implements Comparable<SubscriptionData> {
// 订阅主题
private String topic;
// 过滤的 Tag 列表
private Set<String> tagsSet = new HashSet<>();
// 上报的时间戳(版本号)
private long subVersion = System.currentTimeMillis();
// 过滤类型
private String expressionType = ExpressionType.TAG;
}
可以看到心跳包中包含了当前消费者客户端的 ID 和客户端消费信息,其中消费信息包含该消费者属于哪个消费组,还有该消费者的订阅关系列表,每个订阅关系中表明了它订阅的是哪个主题、哪种订阅类型、用来过滤消息的 Tag 列表和当前的时间戳。
服务端处理订阅关系
Broker 在接收到心跳后,会更新本地的订阅关系表。
org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor#registerConsumer
public boolean registerConsumer(...) {
// 获取或创建消费组信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新订阅关系
consumerGroupInfo.updateSubscription(subList);
}
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
for (SubscriptionData sub : subList) {
// 根据 Topic 查对应的订阅关系
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
// 订阅关系不存在,更新本订阅关系
if (old == null) {
this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
// 如果已存在则用新的覆盖旧的
} else if (sub.getSubVersion() > old.getSubVersion()) {
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
return updated;
}
这里可以看到 Broker 在更新订阅关系时,同一个消费组下订阅的同一个主题的订阅关系是直接使用最新上报的关系,那么不同客户端上报的订阅关系不一致时服务端报错的订阅关系就会一直被相互覆盖,只会以最新上报的订阅关系为准。
根据订阅关系过滤消息
而消费者客户端在拉取消息时,Broker 会使用已保存的订阅关系来进行过滤。
org.apache.rocketmq.store.DefaultMessageStore#getMessage
@Overridepublic
boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
// 匹配逻辑
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)|| subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
那么当 Broker 上的订阅关系一致在变化时,过滤消息的结果就可能是不符合预期的。详细实现原理参考:《Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践》。
腾讯云优化实践
订阅关系不一致会直接导致消息消费异常,需快速定位并修复。TDMQ RocketMQ 版控制台提供可视化检测能力,无需人工逐个排查日志或配置,通过控制台 3 步即可完成问题的发现、定位、修复,降低运维复杂度。
1、一键检测
自动对比消费组内所有客户端的订阅配置,高亮显示出不一致的订阅关系。
2、精准定位
点击不一致详情,直接关联到具体客户端实例,可快速判断出非预期的订阅关系所在的实例。
3、闭环验证
修订后实时同步订阅关系一致性状态,确保消费组订阅关系符合预期。
常见问题
哪些典型场景会出现订阅关系不一致?
总结
订阅关系一致性是 RocketMQ 消息系统消费行为正确的核心保障。本文通过订阅关系定义、一致性约束机制、原理分析与腾讯云优化实践,系统阐述了订阅关系不一致的潜在风险与解决方案:
通过本文的介绍与案例分析,相信读者对 TDMQ RocketMQ 版的订阅关系一致性机制有了更深入的理解,并能够在实际项目中灵活应用这一机制,保障消息 100% 按预期路由,避免因配置偏差导致业务消息漏消费。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有