Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊rocketmq的ExpressionForRetryMessageFilter

聊聊rocketmq的ExpressionForRetryMessageFilter

作者头像
code4it
发布于 2019-12-27 09:43:27
发布于 2019-12-27 09:43:27
29500
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下rocketmq的ExpressionForRetryMessageFilter

MessageFilter

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface MessageFilter {
    /**
     * match by tags code or filter bit map which is calculated when message received
     * and stored in consume queue ext.
     *
     * @param tagsCode tagsCode
     * @param cqExtUnit extend unit of consume queue
     */
    boolean isMatchedByConsumeQueue(final Long tagsCode,
        final ConsumeQueueExt.CqExtUnit cqExtUnit);

    /**
     * match by message content which are stored in commit log.
     * <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in store,
     * {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer} is null.
     *
     * @param msgBuffer message buffer in commit log, may be null if not invoked in store.
     * @param properties message properties, should decode from buffer if null by yourself.
     */
    boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
        final Map<String, String> properties);
}
  • MessageFilter定义了isMatchedByConsumeQueue、isMatchedByCommitLog方法

ExpressionMessageFilter

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ExpressionMessageFilter implements MessageFilter {

    protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);

    protected final SubscriptionData subscriptionData;
    protected final ConsumerFilterData consumerFilterData;
    protected final ConsumerFilterManager consumerFilterManager;
    protected final boolean bloomDataValid;

    public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
        ConsumerFilterManager consumerFilterManager) {
        this.subscriptionData = subscriptionData;
        this.consumerFilterData = consumerFilterData;
        this.consumerFilterManager = consumerFilterManager;
        if (consumerFilterData == null) {
            bloomDataValid = false;
            return;
        }
        BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
        if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {
            bloomDataValid = true;
        } else {
            bloomDataValid = false;
        }
    }

    @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
        if (null == subscriptionData) {
            return true;
        }

        if (subscriptionData.isClassFilterMode()) {
            return true;
        }

        // by tags code.
        if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {

            if (tagsCode == null) {
                return true;
            }

            if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
                return true;
            }

            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
        } else {
            // no expression or no bloom
            if (consumerFilterData == null || consumerFilterData.getExpression() == null
                || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
                return true;
            }

            // message is before consumer
            if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
                log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
                return true;
            }

            byte[] filterBitMap = cqExtUnit.getFilterBitMap();
            BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
            if (filterBitMap == null || !this.bloomDataValid
                || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
                return true;
            }

            BitsArray bitsArray = null;
            try {
                bitsArray = BitsArray.create(filterBitMap);
                boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
                log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
                return ret;
            } catch (Throwable e) {
                log.error("bloom filter error, sub=" + subscriptionData
                    + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
            }
        }

        return true;
    }

    @Override
    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
        if (subscriptionData == null) {
            return true;
        }

        if (subscriptionData.isClassFilterMode()) {
            return true;
        }

        if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
            return true;
        }

        ConsumerFilterData realFilterData = this.consumerFilterData;
        Map<String, String> tempProperties = properties;

        // no expression
        if (realFilterData == null || realFilterData.getExpression() == null
            || realFilterData.getCompiledExpression() == null) {
            return true;
        }

        if (tempProperties == null && msgBuffer != null) {
            tempProperties = MessageDecoder.decodeProperties(msgBuffer);
        }

        Object ret = null;
        try {
            MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);

            ret = realFilterData.getCompiledExpression().evaluate(context);
        } catch (Throwable e) {
            log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
        }

        log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);

        if (ret == null || !(ret instanceof Boolean)) {
            return false;
        }

        return (Boolean) ret;
    }

}
  • ExpressionMessageFilter实现了MessageFilter接口,其isMatchedByConsumeQueue方法利用了bloomFilter进行判断;isMatchedByCommitLog方法主要是通过realFilterData.getCompiledExpression().evaluate(context)来获取结果

ExpressionForRetryMessageFilter

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
    public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
        ConsumerFilterManager consumerFilterManager) {
        super(subscriptionData, consumerFilterData, consumerFilterManager);
    }

    @Override
    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
        if (subscriptionData == null) {
            return true;
        }

        if (subscriptionData.isClassFilterMode()) {
            return true;
        }

        boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);

        if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) {
            return true;
        }

        ConsumerFilterData realFilterData = this.consumerFilterData;
        Map<String, String> tempProperties = properties;
        boolean decoded = false;
        if (isRetryTopic) {
            // retry topic, use original filter data.
            // poor performance to support retry filter.
            if (tempProperties == null && msgBuffer != null) {
                decoded = true;
                tempProperties = MessageDecoder.decodeProperties(msgBuffer);
            }
            String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);
            String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
            realFilterData = this.consumerFilterManager.get(realTopic, group);
        }

        // no expression
        if (realFilterData == null || realFilterData.getExpression() == null
            || realFilterData.getCompiledExpression() == null) {
            return true;
        }

        if (!decoded && tempProperties == null && msgBuffer != null) {
            tempProperties = MessageDecoder.decodeProperties(msgBuffer);
        }

        Object ret = null;
        try {
            MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);

            ret = realFilterData.getCompiledExpression().evaluate(context);
        } catch (Throwable e) {
            log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
        }

        log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);

        if (ret == null || !(ret instanceof Boolean)) {
            return false;
        }

        return (Boolean) ret;
    }
}
  • ExpressionForRetryMessageFilter继承了ExpressionMessageFilter,它覆盖了isMatchedByCommitLog方法,里头会使用subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)来判断是否是isRetryTopic;对于retryTopic会使用tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC)来获取realTopic,从而根据consumerFilterManager.get(realTopic, group)获取realFilterData;最后通过realFilterData.getCompiledExpression().evaluate(context)来获取结果

小结

MessageFilter定义了isMatchedByConsumeQueue、isMatchedByCommitLog方法;ExpressionMessageFilter实现了MessageFilter接口,其isMatchedByConsumeQueue方法利用了bloomFilter进行判断;isMatchedByCommitLog方法主要是通过realFilterData.getCompiledExpression().evaluate(context)来获取结果;ExpressionForRetryMessageFilter继承了ExpressionMessageFilter,它覆盖了isMatchedByCommitLog方法,里头会使用subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)来判断是否是isRetryTopic

doc

  • ExpressionForRetryMessageFilter
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-12-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊rocketmq-proxy的popMessage
org/apache/rocketmq/proxy/processor/MessagingProcessor.java
code4it
2024/08/13
1240
聊聊rocketmq的ClientManageProcessor
rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
code4it
2019/12/26
5360
聊聊rocketmq的ClientManageProcessor
Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践
本文将系统阐述 Apache RocketMQ 消息过滤机制的技术架构与实践要点。首先从业务应用场景切入,解析消息过滤的核心价值;接着介绍 Apache RocketMQ 支持的两种消息过滤实现方式,帮助读者建立基础认知框架;随后深入剖析 SQL 语法过滤与标签(Tag)过滤的技术实现的核心原理以及规则限制;最后介绍腾讯云在消息过滤性能优化方面的具体实践。
腾讯云中间件团队
2025/04/04
1420
Apache RocketMQ 消息过滤的实现原理与腾讯云的使用实践
RocketMQ(三):server端处理框架及消费数据查找实现
 rocketmq作为一个高性能的消息中间件,咱们光停留在使用层面,总感觉缺点什么。虽然rocketmq的官方设计文档讲得还是比较详细的,但纸上得来终觉浅!今天我们就来亲自挖一挖rocketmq的实现细节:server端处理框架以及如果进行消费消息。
huofo
2022/03/17
5830
RocketMQ(三):server端处理框架及消费数据查找实现
聊聊rocketmq的suggestPullingFromSlave
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
code4it
2019/12/05
7070
聊聊rocketmq的suggestPullingFromSlave
聊聊rocketmq的pullFromWhichNodeTable
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
code4it
2019/12/04
4880
聊聊rocketmq的pullFromWhichNodeTable
聊聊rocketmq-proxy的popMessage
org/apache/rocketmq/proxy/processor/MessagingProcessor.java
code4it
2024/08/13
1760
聊聊rocketmq-proxy的popMessage
聊聊rocketmq的pullBatchSize
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
code4it
2019/11/19
6300
聊聊rocketmq的pullBatchSize
聊聊rocketmq的AccessValidator
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.java
code4it
2019/11/12
9100
面试系列之-rocketmq长轮询模式
Consumer主动从Broker获取消息,可以设置多久拉取一次、可以设置一次拉取多少条消息等参数;
用户4283147
2022/12/29
6550
面试系列之-rocketmq长轮询模式
RocketMQ(一):推拉消费模型客户端实践
现在的的互联网系统中,mq已经必备基础设施了,我们已明显感觉它的必要性与强大。然而,它的本质是啥?存储转发系统罢了!
烂猪皮
2021/01/28
1.3K0
RocketMQ(一):推拉消费模型客户端实践
RocketMQ消息过滤实现原理
RocketMQ消息中间件相比于其他消息中间件提供了更细粒度的消息过滤,相比于Topic做业务维度的区分,Tag,即消息标签,用于对某个Topic下的消息进行进一步分类。消息队列RocketMQ版的生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
叔牙
2022/12/18
6590
RocketMQ消息过滤实现原理
聊聊rocketmq的maxReconsumeTimes
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
code4it
2019/11/20
2.5K0
聊聊rocketmq的maxReconsumeTimes
聊聊rocketmq的updateTopicRouteInfoFromNameServer
本文主要研究一下rocketmq的updateTopicRouteInfoFromNameServer
code4it
2019/12/02
3K0
聊聊rocketmq的updateTopicRouteInfoFromNameServer
RocketMQ 源码分析 —— Filtersrv
Filtersrv ,负责自定义规则过滤 Consumer 从 Broker 拉取的消息。
芋道源码
2020/05/19
5570
RocketMQ 源码分析 —— Filtersrv
聊聊rocketmq的sendHeartbeatToAllBrokerWithLock
本文主要研究一下rocketmq的sendHeartbeatToAllBrokerWithLock
code4it
2019/12/07
9330
聊聊rocketmq的sendHeartbeatToAllBrokerWithLock
RocketMQ 源码分析 —— Message 拉取与消费(上)
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-first/ 「芋道源码」欢迎转载,保留摘要,谢谢!
芋道源码
2020/04/29
1.2K0
CousumeQueue中tag的作用
存在就是有意义的,那么ConsumeQueue中存消息tag的hashcode是什么目的呢? 查到的资料是用于消息的过滤,因为Consumer可以根据主题和tag消费消息
CBeann
2023/12/25
1680
CousumeQueue中tag的作用
聊聊rocketmq的QueryMessageProcessor
rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
code4it
2019/12/25
6760
聊聊rocketmq的QueryMessageProcessor
聊聊rocketmq的compressMsgBodyOverHowmuch
本文主要研究一下rocketmq的compressMsgBodyOverHowmuch
code4it
2019/11/07
7040
聊聊rocketmq的compressMsgBodyOverHowmuch
相关推荐
聊聊rocketmq-proxy的popMessage
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验