前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践

Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践

作者头像
腾讯云中间件团队
发布于 2025-04-04 01:54:07
发布于 2025-04-04 01:54:07
14200
代码可运行
举报
运行总次数:0
代码可运行
导语

本文将系统阐述 Apache RocketMQ 消息过滤机制的技术架构与实践要点。首先从业务应用场景切入,解析消息过滤的核心价值;接着介绍 Apache RocketMQ 支持的两种消息过滤实现方式,帮助读者建立基础认知框架;随后深入剖析 SQL 语法过滤与标签(Tag)过滤的技术实现的核心原理以及规则限制;最后介绍腾讯云在消息过滤性能优化方面的具体实践。

消息过滤的应用场景

消息过滤功能指消息生产者向 Topic 中发送消息时,设置消息属性对消息进行分类,消费者订阅 Topic 时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。

消费者订阅 Topic 时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic 中的所有消息都将被投递到消费端进行消费。

消息过滤功能可以应用在如下场景:

用户想使用一个 Topic,但是被多个 Group 订阅,每个 Group 只想消费其中一部分消息。

默认情况下,消费组1和消费组2 会全量消费 Topic 里面的所有消息。但如果我们想选择性的消费里面一些消息的时候,就可以使用消息过滤功能对消息进行区分过滤。

消息过滤原理介绍

目前消息过滤主要支持两种过滤方式,分别是 SQL 过滤和 Tag 过滤。其核心逻辑都是在发送消息的时候,设置一些自定义字段,然后通过消费组订阅的时候指定对应的过滤表达式,消息在服务端进行过滤后,才被消费组消费。

Tag 过滤

代码示例

在介绍原理之前,我们先直观的看一下用法,以 RocketMQ 4.x 的 SDK 为例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 创建消息生产者
DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);
// 创建消息实例,设置topic和消息内容,设置Tag1
Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
// 发送消息
SendResult sendResult = producer.send(msg, 3000);
System.out.println(sendResult + ":" + new String(msg.getBody()));
producer.shutdown();

在发送消息的时候,我们可以给消息指定 Tag。而在消费组这一侧,我们可以订阅不同的 Tag,例如使用星号(*)匹配全部 Tag。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 创建消息消费者
DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);
// 订阅topic 订阅所有的TAG
pushConsumer.subscribe(TOPIC_NAME, "*");
//订阅指定的TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1");
//订阅多个TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");
// 省略其他代码

核心原理

那么,这个功能在底层究竟是怎么实现的呢?我们都知道,RocketMQ 在存储消息的时候,会先把消息写入 CommitLog。CommitLog 可以看作是 RocketMQ 存储消息的一个大日志,所有消息都会先写到这里。

写完 CommitLog 之后,RocketMQ 还会把消息的相关信息再写入ConsumeQueue。ConsumeQueue 可以理解为是 CommitLog 的一个索引,它里面存储的不是完整的消息内容,而是消息的一些关键信息,方便消费者快速找到和读取消息。

具体来说,当 RocketMQ 写入一条原始消息到 CommitLog 之后,它会提取这条消息的一些重要信息,比如这条消息在 CommitLog 里的物理偏移量(Offset)、消息的大小,还有这条消息 Tag 的哈希码(Hashcode),然后把这些信息写入到 ConsumeQueue 中。这样一来,消费者就可以通过 ConsumeQueue 快速定位到 CommitLog 里对应的消息,而不需要每次都去遍历整个 CommitLog,大大提高了消息消费的效率。

在发送消息的的时候,RocketMQ 会将消息 Tag 的 Hashcode 写入到 ConsumeQueue 字段中。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
//上面这个方法
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
    if (Strings.isNullOrEmpty(tags)) {
      return 0; 
    }
    return tags.hashCode();
}

当用户在消费消息时,服务端会从 ConsumeQueue 中一条一条地检查消息,并对这些消息进行过滤。过滤的时候,服务端会用 ConsumeQueue 中存储的消息 Tag 的 Hashcode,和当前订阅组所订阅的 Tag 进行匹配。这个匹配的过程,是在 RocketMQ 的核心代码 org.apache.rocketmq.store.DefaultMessageStore#getMessage 方法里实现的。简单来说,就是服务端会根据订阅组订阅的 Tag,从 ConsumeQueue 中找出符合条件的消息,然后交给用户去消费。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
    //省略其他代码
    continue;
}

匹配逻辑

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {
    return true;
}
if (subscriptionData.isClassFilterMode()) {
    return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)||     subscriptionData.getCodeSet().contains(tagsCode.intValue());
}

可以看到,只有当订阅的 Tag 是 “*”(表示订阅所有消息),或者消息的 Tag 和订阅的 Tag 匹配上的时候,消费者才能消费到这条消息。这里您可能会有疑问,为啥要用 CodeSet 呢?其实是因为这里会把用户订阅组订阅的 Tag 进行拆分,然后把这些拆分后的 Tag 放到 CodeSet 里。这样在匹配的时候,就可以快速判断消息的 Tag 是否在 CodeSet 中,从而决定这条消息能不能被消费。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
org.apache.rocketmq.remoting.protocol.filter.FilterAPI#buildSubscriptionData(java.lang.String, java.lang.String)
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
    Arrays.stream(tags).map(String::trim).filter(tag -> !tag.isEmpty()).forEach(tag -> {
        subscriptionData.getTagsSet().add(tag);
        subscriptionData.getCodeSet().add(tag.hashCode());
    });
} else {
    throw new Exception("subString split error");
}

然后,CodeSet 里面存的就是多个 Hashcode 。

这里您可能又会有疑问:要是在服务端通过哈希码来匹配,那万一两个不同的 Tag 经过哈希运算之后,得到的哈希码是一样的,这样不就匹配错了吗?没错,这种情况确实有可能发生。

为了避免这种问题,客户端还会再做一层过滤,使用真正的 Tag 字符串再过滤一次,这样就能保证最终消费到的消息一定是符合订阅要求的,不会出现因为哈希冲突而导致的匹配错误。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
    msgListFilterAgain = new ArrayList<>(msgList.size());
    for (MessageExt msg : msgList) {
        if (msg.getTags() != null) {
            if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                msgListFilterAgain.add(msg);
            }
        }
    }
}

综上所述,我们就得到了这样一个 Tag 过滤的工作流程。

规则限制

  • 发送消息只能设置一个 Tag。
  • 多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费。
  • 多个 Tag 的顺序也要保持一致,否则会导致订阅关系不一致,例如 Tag1||Tag2 和 Tag2||Tag1 就是不同的。

SQL 过滤

从上面可以看到,Tag 过滤比较简单,通过在 ConsumeQueue 直接进行匹配,效率比较高,但是能支持的消息过滤比较简单,如果想通过消息的某个扩展字段来进行匹配,做一些更复杂的逻辑,就需要使用 SQL 过滤了。

代码示例

在发送方我们可以设置一下 putUserProperty,来扩展字段。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void main(String[] args) throws Exception {
  // 创建消息生产者
  DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);
  // 创建消息实例,设置topic和消息内容,设置自定义属性
  Message msg = new Message(TOPIC_NAME, "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
  msg.putUserProperty("key1","value1");
  // 发送消息
  SendResult sendResult = producer.send(msg, 3000);
  System.out.println(sendResult + ":" + new String(msg.getBody()));
  producer.shutdown();
}

当消费组去消费消息的时候,它可以用 SQL 表达式来精准地筛选消息。例如我们可以设定条件,像 “Key1 必须等于 Value1”,或者设置更复杂的条件,用 “AND” 和 “OR” 这些逻辑运算符把多个条件组合起来。SQL 过滤能按照设置的条件,精确地过滤出符合条件的消息。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void main(String[] args) throws Exception {
  // 创建消息消费者
  DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);
  //订阅所有消息
  pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));
  // 订阅topic 订阅单个key的sql
  pushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));
  //订阅多个属性
  pushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL  AND key1='value1' AND key2='value2'"));
  // 省略其他代码
}

核心原理

从前面的介绍可以知道,ConsumeQueue 里并没有存储用于 SQL 表达式匹配的相关信息。所以要是想根据 SQL 表达式来匹配消息,就只能把 CommitLog 里的消息读取出来,然后进行运算处理。实现这部分功能的代码,同样是在 org.apache.rocketmq.store.DefaultMessageStore#getMessage 这个方法里。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {                                              // 省略部分代码
  continue;
}
//匹配逻辑。          
if (tempProperties == null && msgBuffer != null) {
 tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {
     MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
     ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
    log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}

可以看到,这里是把所有消息的属性字段都放到 MessageEvaluationContext 里面,然后根据用户写的 SQL 表达式来对消息进行过滤。这里用到的 getCompiledExpression() 方法,其实就是通过 SqlFilter 把用户写的 SQL 表达式编译成一个 BooleanExpression,最终计算结果就是匹配或者不匹配。

如果您想了解这个功能的完整 SQL 实现细节,可以去深入研究 Rocketmq-filter 模块。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public Expression compile(final String expr) throws MQFilterException {
    return SelectorParser.parse(expr);
}

和 Tag 的过滤方式不太一样,BooleanExpression 需要读取真实的消息内容,并且是基于消息的实际字符串去做匹配。这种方式的好处是客户端不需要额外做什么配置,但缺点是性能相对会差一些,因为每次都要读取消息内容来匹配。

为了提升性能,社区想了个办法,就是在 ConsumeQueue 里增加一个扩展字段。不过要使用这个功能,需要先打开 enableConsumeQueueExt 这个开关。打开之后,就可以利用布隆过滤器(Bloom Filter)来做优化了。

布隆过滤器的原理大概是这样的:它会根据消息的属性生成一个序列化的布隆过滤器数据。在过滤的时候,如果布隆过滤器判断消息不符合条件,那这条消息肯定是不符合的,就可以直接过滤掉;如果布隆过滤器判断消息符合条件,那还需要进一步做精确匹配。

综上所述,我们就得到了这样一个 SQL 过滤的工作流程。

规则限制

由于 SQL 属性过滤是生产者定义消息属性,消费者设置 SQL 过滤条件,计算之后,可能有不同的结果,因此服务端的处理方式如下:

  • 异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。
  • 空值情况处理:如果过滤条件的表达式计算值为 null 或不是布尔类型(true 和 false),则消息默认被过滤,不会被投递给消费者。例如发送消息时不存在某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为 null。
  • 类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。

虽然这种方式是灵活的,但是在消息头中还是不建议设置太多的值,因为总的消息头部属性有大小限制(32k),内置的已经占用了不少。超长之后,可能导致消息发送或者消费异常。

两种过滤方式的对比总结

过滤方式

TAG过滤

SQL过滤

是否支持多个过滤条件

性能

处理方

客户端+服务端

服务端

易用性

一般

使用建议

合理划分主题和 Tag 标签。

从消息的过滤机制和主题的原理机制可以看出,业务消息的拆分可以基于主题进行筛选,也可以基于主题内消息的 Tag 标签及属性进行筛选。关于拆分方式的选择,应遵循以下原则:

  • 消息类型是否一致:不同类型的消息,如顺序消息和普通消息需要使用不同的主题进行拆分,无法通过 Tag 标签进行分类。
  • 业务域是否相同:不同业务域和部门的消息应该拆分不同的主题。例如物流消息和支付消息应该使用两个不同的主题;同样是一个主题内的物流消息,普通物流消息和加急物流消息则可以通过不同的 Tag 进行区分。
  • 消息量级和重要性是否一致:如果消息的量级规模存在巨大差异,或者说消息的链路重要程度存在差异,则应该使用不同的主题进行隔离拆分。

腾讯云消息过滤轨迹展示

从上述消息过滤的原理介绍可以发现,如果消息被过滤掉了,用户收不到这条消息,和消息本身没有被消费的情况看起来是一样的。这时候用户可能会有一些疑惑:在 RocketMQ 的管理控制台(dashboard)上,消息显示是“已消费”状态,可实际上自己并没收到。

在腾讯云 TDMQ RocketMQ 版上,我们针对过滤条件的查询进行了优化。通过这个优化,能够区分展示消息过滤和真正被消费两种情况的消息轨迹展示。这样一来,用户就能很直观地看到消息到底是被过滤掉了,还是真正被消费了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-04-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云中间件 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验