本文将系统阐述 Apache RocketMQ 消息过滤机制的技术架构与实践要点。首先从业务应用场景切入,解析消息过滤的核心价值;接着介绍 Apache RocketMQ 支持的两种消息过滤实现方式,帮助读者建立基础认知框架;随后深入剖析 SQL 语法过滤与标签(Tag)过滤的技术实现的核心原理以及规则限制;最后介绍腾讯云在消息过滤性能优化方面的具体实践。
消息过滤的应用场景
消息过滤功能指消息生产者向 Topic 中发送消息时,设置消息属性对消息进行分类,消费者订阅 Topic 时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。
消费者订阅 Topic 时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic 中的所有消息都将被投递到消费端进行消费。
消息过滤功能可以应用在如下场景:
用户想使用一个 Topic,但是被多个 Group 订阅,每个 Group 只想消费其中一部分消息。
默认情况下,消费组1和消费组2 会全量消费 Topic 里面的所有消息。但如果我们想选择性的消费里面一些消息的时候,就可以使用消息过滤功能对消息进行区分过滤。
消息过滤原理介绍
目前消息过滤主要支持两种过滤方式,分别是 SQL 过滤和 Tag 过滤。其核心逻辑都是在发送消息的时候,设置一些自定义字段,然后通过消费组订阅的时候指定对应的过滤表达式,消息在服务端进行过滤后,才被消费组消费。
Tag 过滤
代码示例
在介绍原理之前,我们先直观的看一下用法,以 RocketMQ 4.x 的 SDK 为例:
// 创建消息生产者
DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);
// 创建消息实例,设置topic和消息内容,设置Tag1
Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
// 发送消息
SendResult sendResult = producer.send(msg, 3000);
System.out.println(sendResult + ":" + new String(msg.getBody()));
producer.shutdown();
在发送消息的时候,我们可以给消息指定 Tag。而在消费组这一侧,我们可以订阅不同的 Tag,例如使用星号(*)匹配全部 Tag。
// 创建消息消费者
DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);
// 订阅topic 订阅所有的TAG
pushConsumer.subscribe(TOPIC_NAME, "*");
//订阅指定的TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1");
//订阅多个TAG
pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");
// 省略其他代码
核心原理
那么,这个功能在底层究竟是怎么实现的呢?我们都知道,RocketMQ 在存储消息的时候,会先把消息写入 CommitLog。CommitLog 可以看作是 RocketMQ 存储消息的一个大日志,所有消息都会先写到这里。
写完 CommitLog 之后,RocketMQ 还会把消息的相关信息再写入ConsumeQueue。ConsumeQueue 可以理解为是 CommitLog 的一个索引,它里面存储的不是完整的消息内容,而是消息的一些关键信息,方便消费者快速找到和读取消息。
具体来说,当 RocketMQ 写入一条原始消息到 CommitLog 之后,它会提取这条消息的一些重要信息,比如这条消息在 CommitLog 里的物理偏移量(Offset)、消息的大小,还有这条消息 Tag 的哈希码(Hashcode),然后把这些信息写入到 ConsumeQueue 中。这样一来,消费者就可以通过 ConsumeQueue 快速定位到 CommitLog 里对应的消息,而不需要每次都去遍历整个 CommitLog,大大提高了消息消费的效率。
在发送消息的的时候,RocketMQ 会将消息 Tag 的 Hashcode 写入到 ConsumeQueue 字段中。
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
//上面这个方法
public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
if (Strings.isNullOrEmpty(tags)) {
return 0;
}
return tags.hashCode();
}
当用户在消费消息时,服务端会从 ConsumeQueue 中一条一条地检查消息,并对这些消息进行过滤。过滤的时候,服务端会用 ConsumeQueue 中存储的消息 Tag 的 Hashcode,和当前订阅组所订阅的 Tag 进行匹配。这个匹配的过程,是在 RocketMQ 的核心代码 org.apache.rocketmq.store.DefaultMessageStore#getMessage 方法里实现的。简单来说,就是服务端会根据订阅组订阅的 Tag,从 ConsumeQueue 中找出符合条件的消息,然后交给用户去消费。
if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
//省略其他代码
continue;
}
匹配逻辑
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)|| subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
可以看到,只有当订阅的 Tag 是 “*”(表示订阅所有消息),或者消息的 Tag 和订阅的 Tag 匹配上的时候,消费者才能消费到这条消息。这里您可能会有疑问,为啥要用 CodeSet 呢?其实是因为这里会把用户订阅组订阅的 Tag 进行拆分,然后把这些拆分后的 Tag 放到 CodeSet 里。这样在匹配的时候,就可以快速判断消息的 Tag 是否在 CodeSet 中,从而决定这条消息能不能被消费。
org.apache.rocketmq.remoting.protocol.filter.FilterAPI#buildSubscriptionData(java.lang.String, java.lang.String)
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
Arrays.stream(tags).map(String::trim).filter(tag -> !tag.isEmpty()).forEach(tag -> {
subscriptionData.getTagsSet().add(tag);
subscriptionData.getCodeSet().add(tag.hashCode());
});
} else {
throw new Exception("subString split error");
}
然后,CodeSet 里面存的就是多个 Hashcode 。
这里您可能又会有疑问:要是在服务端通过哈希码来匹配,那万一两个不同的 Tag 经过哈希运算之后,得到的哈希码是一样的,这样不就匹配错了吗?没错,这种情况确实有可能发生。
为了避免这种问题,客户端还会再做一层过滤,使用真正的 Tag 字符串再过滤一次,这样就能保证最终消费到的消息一定是符合订阅要求的,不会出现因为哈希冲突而导致的匹配错误。
org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#processPullResult
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
综上所述,我们就得到了这样一个 Tag 过滤的工作流程。
规则限制
SQL 过滤
从上面可以看到,Tag 过滤比较简单,通过在 ConsumeQueue 直接进行匹配,效率比较高,但是能支持的消息过滤比较简单,如果想通过消息的某个扩展字段来进行匹配,做一些更复杂的逻辑,就需要使用 SQL 过滤了。
代码示例
在发送方我们可以设置一下 putUserProperty,来扩展字段。
public static void main(String[] args) throws Exception {
// 创建消息生产者
DefaultMQProducer producer = ClientCreater.createProducer(GROUP_NAME);
// 创建消息实例,设置topic和消息内容,设置自定义属性
Message msg = new Message(TOPIC_NAME, "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
msg.putUserProperty("key1","value1");
// 发送消息
SendResult sendResult = producer.send(msg, 3000);
System.out.println(sendResult + ":" + new String(msg.getBody()));
producer.shutdown();
}
当消费组去消费消息的时候,它可以用 SQL 表达式来精准地筛选消息。例如我们可以设定条件,像 “Key1 必须等于 Value1”,或者设置更复杂的条件,用 “AND” 和 “OR” 这些逻辑运算符把多个条件组合起来。SQL 过滤能按照设置的条件,精确地过滤出符合条件的消息。
public static void main(String[] args) throws Exception {
// 创建消息消费者
DefaultMQPushConsumer pushConsumer = ClientCreater.createPushConsumer(GROUP_NAME);
//订阅所有消息
pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));
// 订阅topic 订阅单个key的sql
pushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));
//订阅多个属性
pushConsumer.subscribe(TOPIC_NAME,MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));
// 省略其他代码
}
核心原理
从前面的介绍可以知道,ConsumeQueue 里并没有存储用于 SQL 表达式匹配的相关信息。所以要是想根据 SQL 表达式来匹配消息,就只能把 CommitLog 里的消息读取出来,然后进行运算处理。实现这部分功能的代码,同样是在 org.apache.rocketmq.store.DefaultMessageStore#getMessage 这个方法里。
if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { // 省略部分代码
continue;
}
//匹配逻辑。
if (tempProperties == null && msgBuffer != null) {
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}
可以看到,这里是把所有消息的属性字段都放到 MessageEvaluationContext 里面,然后根据用户写的 SQL 表达式来对消息进行过滤。这里用到的 getCompiledExpression() 方法,其实就是通过 SqlFilter 把用户写的 SQL 表达式编译成一个 BooleanExpression,最终计算结果就是匹配或者不匹配。
如果您想了解这个功能的完整 SQL 实现细节,可以去深入研究 Rocketmq-filter 模块。
@Override
public Expression compile(final String expr) throws MQFilterException {
return SelectorParser.parse(expr);
}
和 Tag 的过滤方式不太一样,BooleanExpression 需要读取真实的消息内容,并且是基于消息的实际字符串去做匹配。这种方式的好处是客户端不需要额外做什么配置,但缺点是性能相对会差一些,因为每次都要读取消息内容来匹配。
为了提升性能,社区想了个办法,就是在 ConsumeQueue 里增加一个扩展字段。不过要使用这个功能,需要先打开 enableConsumeQueueExt 这个开关。打开之后,就可以利用布隆过滤器(Bloom Filter)来做优化了。
布隆过滤器的原理大概是这样的:它会根据消息的属性生成一个序列化的布隆过滤器数据。在过滤的时候,如果布隆过滤器判断消息不符合条件,那这条消息肯定是不符合的,就可以直接过滤掉;如果布隆过滤器判断消息符合条件,那还需要进一步做精确匹配。
综上所述,我们就得到了这样一个 SQL 过滤的工作流程。
规则限制
由于 SQL 属性过滤是生产者定义消息属性,消费者设置 SQL 过滤条件,计算之后,可能有不同的结果,因此服务端的处理方式如下:
虽然这种方式是灵活的,但是在消息头中还是不建议设置太多的值,因为总的消息头部属性有大小限制(32k),内置的已经占用了不少。超长之后,可能导致消息发送或者消费异常。
两种过滤方式的对比总结
过滤方式 | TAG过滤 | SQL过滤 |
---|---|---|
是否支持多个过滤条件 | 否 | 是 |
性能 | 高 | 中 |
处理方 | 客户端+服务端 | 服务端 |
易用性 | 强 | 一般 |
使用建议
合理划分主题和 Tag 标签。
从消息的过滤机制和主题的原理机制可以看出,业务消息的拆分可以基于主题进行筛选,也可以基于主题内消息的 Tag 标签及属性进行筛选。关于拆分方式的选择,应遵循以下原则:
腾讯云消息过滤轨迹展示
从上述消息过滤的原理介绍可以发现,如果消息被过滤掉了,用户收不到这条消息,和消息本身没有被消费的情况看起来是一样的。这时候用户可能会有一些疑惑:在 RocketMQ 的管理控制台(dashboard)上,消息显示是“已消费”状态,可实际上自己并没收到。
在腾讯云 TDMQ RocketMQ 版上,我们针对过滤条件的查询进行了优化。通过这个优化,能够区分展示消息过滤和真正被消费两种情况的消息轨迹展示。这样一来,用户就能很直观地看到消息到底是被过滤掉了,还是真正被消费了。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有