前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ消息过滤实现原理

RocketMQ消息过滤实现原理

作者头像
叔牙
发布2022-12-18 09:17:39
6030
发布2022-12-18 09:17:39
举报
文章被收录于专栏:一个执拗的后端搬砖工

微信公众号:PersistentCoder

内容目录

一、背景二、使用三、原理源码分析四、总结参考

一、背景

RocketMQ消息中间件相比于其他消息中间件提供了更细粒度的消息过滤,相比于Topic做业务维度的区分,Tag,即消息标签,用于对某个Topic下的消息进行进一步分类。消息队列RocketMQ版的生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。

以电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以如下消息为例:

  • 订单消息
  • 支付消息
  • 物流消息

这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:

  • 支付系统:只需订阅支付消息。
  • 物流系统:只需订阅物流消息。
  • 交易成功率分析系统:需订阅订单和支付消息。
  • 实时计算系统:需要订阅所有和交易相关的消息。

使用过RocketMQ的小伙伴会注意到该消息组件支持Tag和Sql两种过滤模式。tag可以理解为topic的子类型,具有某一类型细分属性的集合,sql过滤模式是使用表达式实现通过消息内容的值进行过滤。

我们本篇重点围绕tag消息的发送和消费原理展开介绍,中间涉及到sql过滤的地方会简单做分析。

二、使用

发送带tag的消息:

代码语言:javascript
复制
Message msg = new Message("topic",
    "TagA",
    "key-",
    "body".getBytes(RemotingHelper.DEFAULT_CHARSET) 
);
SendResult sendResult = producer.send(msg);

订阅并消费带tag的消息:

代码语言:javascript
复制
consumer.subscribe("Topic","TagA");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    log.info("receive msg={}", JSON.toJSONString(msgs));
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

如果是使用sql过滤,订阅的时候把订阅关系改为:

代码语言:javascript
复制
consumer.subscribe("Topic",MessageSelector.bySql("name = 'xxx'"));

三、原理源码分析

1.tag消息的发送和存储

构造tag消息的时候会调用Message内部方法,把tag放到内部属性中:

代码语言:javascript
复制
public void setTags(String tags) {
    this.putProperty("TAGS", tags);
}

发送消息会调用到client的内部实现DefaultMQProducer的sendKernelImpl方法:

代码语言:javascript
复制
private SendResult sendKernelImpl(final Message msg,
                                  final MessageQueue mq,
                                  final CommunicationMode communicationMode,
                                  final SendCallback sendCallback,
                                  final TopicPublishInfo topicPublishInfo,
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
        byte[] prevBody = msg.getBody();
        try {
            //省略...
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setFlag(msg.getFlag());
            //设置属性信息(tag也在里边)
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            SendResult sendResult = null;
            switch (communicationMode) {
                //省略...
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            return sendResult;
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

省略掉部分代码,中间会从Messenge中获取属性信息放入请求头,然后调用发送逻辑。

代码语言:javascript
复制
private SendResult sendMessageSync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    return this.processSendResponse(brokerName, msg, response);
}

这里会调用netty客户端与broker服务端的通信能力,将消息内容发送到broker。

消息发送到broker服务端的时候会经过SendMessageProcessor#processRequest处理,会调用sendMessage方法:

代码语言:javascript
复制
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
    final RemotingCommand request,
    final SendMessageContext sendMessageContext,
    final SendMessageRequestHeader requestHeader,
    final TopicQueueMappingContext mappingContext,
    final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
    //省略...
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    msgInner.setBody(body);
    MessageAccessor.setProperties(msgInner, oriProps);
    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
        //省略...
    } else {
        PutMessageResult putMessageResult = null;
        if (sendTransactionPrepareMessage) {
            putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        }
        handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
        sendMessageCallback.onComplete(sendMessageContext, response);
        return response;
    }
}

然后调用DefaultMessageStore的asyncPutMessage方法将消息数据追加到commitLog最新文件中:

代码语言:javascript
复制
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    //省略...
    long beginTime = this.getSystemClock().now();
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
    //省略...
    return putResultFuture;
}

然后通过broker的ReputMessageService将commitLog中存储的消息不停的分发到对应topic的consumerQueue消费队列中,consumerQueue中一个 entry 占 20 字节,8字节 commitLog 物理偏移,4字节消息大小,8字节 tag 的 hashCode 值(消息过滤就会用到它)。

2.tag消息的过滤

tag消息的过滤我们也分成两块分析,分别是订阅(过滤)关系维护和消息过滤。

订阅(过滤)关系维护

订阅关系维护是在consumer启动时和broker建立连接把自己的订阅信息元数据推送到broker端,我们从consumer的启动开始分析:

代码语言:javascript
复制
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}
  • mQClientAPIImpl启动:启动netty客户端,负责与broker数据交互
  • startScheduledTask启动:启动各种任务,定时刷新nameserver地址,定时更新topic路由,定时发送心跳,定时持久化offset,定时调整线程池等等。
  • pullMessageService启动: 启动拉取消息服务,其实就算是push模式,也是由客户端先拉取到本地放到本地队列进行消费,这一点很关键
  • rebalanceService启动:启动负载均衡服务
  • DefaultMQProducer启动:启动发消息服务

我们先看一下定时发送心跳逻辑,订阅和过滤信息的上传也是由发送心跳实现:

代码语言:javascript
复制
public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}

分别调用了发送心跳到broker和上传类过滤器(暂未使用),继续看一下发送心跳到broker实现:

代码语言:javascript
复制
private void sendHeartbeatToAllBroker() {
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    if (!this.brokerAddrTable.isEmpty()) {
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, HashMap<Long, String>> entry = it.next();
            String brokerName = entry.getKey();
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    Long id = entry1.getKey();
                    String addr = entry1.getValue();
                    if (addr != null) {
                        if (consumerEmpty) {
                            if (id != MixAll.MASTER_ID)
                                continue;
                        }
                        try {
                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                        } catch (Exception e) {
                        }
                    }
                }
            }
        }
    }
}

然后使用RemotingClient将订阅信息通过心跳包的方式发送到broker,发送订阅数据到broker流程图如下:

然后我们看一下broker如何处理和存储订阅信息的。broker启动时注册很多processor:

代码语言:javascript
复制
/**
 * ClientManageProcessor
 */
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);

这里注册了ClientManageProcessor,consumer发送心跳时会经由其processRequest处理:

代码语言:javascript
复制
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.HEART_BEAT:
            return this.heartBeat(ctx, request);
        case RequestCode.UNREGISTER_CLIENT:
            return this.unregisterClient(ctx, request);
        case RequestCode.CHECK_CLIENT_CONFIG:
            return this.checkClientConfig(ctx, request);
        default:
            break;
    }
    return null;
}

最终会调用ConsumerFilterManager的register将订阅信息存储起来:

代码语言:javascript
复制
public boolean register(final String topic, final String consumerGroup, final String expression,
    final String type, final long clientVersion) {
    FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
    if (filterDataMapByTopic == null) {
        FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
        FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
        filterDataMapByTopic = prev != null ? prev : temp;
    }
    BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
    return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}

存储结构如下:

代码语言:javascript
复制
{
  "filterDataByTopic":{
    "Topic":{
      "topic": String,
      "groupFilterData": {
        "consumerGroup":{
          "consumerGroup" : String,
          "topic":  String,
          "expression": String,
          "expressionType": String,
          "bornTime": long,
          "deadTime": long,
          "bloomFilterData":{
            "bitPos":  int[],
            "bitNum": int
          },
          "clientVersion": long
        }
      },
      ....
    },
    ....
  }
}

broker接收心跳并存储订阅信息流程如下:

消息过滤

前边有分析到,Consume启动的时候会启动PullMessageService拉消息服务,它是一个独立线程,看一下业务实现:

代码语言:javascript
复制
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

接着调用DefaultMQPushConsumerImpl#pullMessage:

代码语言:javascript
复制
public void pullMessage(final PullRequest pullRequest) {
  //省略...
    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        } else {
                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);
                        }
                        break;
                    //省略
                    default:
                        break;
                }
            }
        }
    };
    String subExpression = null;
    SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    try {
        this.pullAPIWrapper.pullKernelImpl(
            pullRequest.getMessageQueue(),
            subExpression,
            subscriptionData.getExpressionType(),
            subscriptionData.getSubVersion(),
            pullRequest.getNextOffset(),
            this.defaultMQPushConsumer.getPullBatchSize(),
            sysFlag,
            commitOffsetValue,
            BROKER_SUSPEND_MAX_TIME_MILLIS,
            CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
            CommunicationMode.ASYNC,
            pullCallback
        );
    } catch (Exception e) {}
}

先定义了从broker拉到消息回调逻辑,然后调用PullApiWrapper#pullKernelImpl拉取消息并触发回调逻辑,将远程拉到的消息放入本地队列提供消费,consumeMessageService.submitConsumeRequest会触发本地消费逻辑,最终会调用到我们定义的MessageListener。

此时拉取消息的请求已经发送给broker,我们看一下broker端如何过滤消息的,拉取消息发送的请求code是PULL_MESSAGE,我们看到broker启动时会根据请求码注册不同的processor,PULL_MESSAGE对应的是PullMessageProcessor:

代码语言:javascript
复制
/**
 * PullMessageProcessor
 */
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

consumer发送拉取消息请求时会调用PullMessageProcessor的processRequest方法:

代码语言:javascript
复制

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
    throws RemotingCommandException {
    //省略...
    MessageFilter messageFilter;
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    } else {
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
            this.brokerController.getConsumerFilterManager());
    }
    final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    //省略...
}

构造消息过滤器并获取消息,消息是从consumerQueue队列过滤并获取,过滤时会调用ExpressionMessageFilter的isMatchedByConsumeQueue方法:

代码语言:javascript
复制
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
    // by tags code.
    if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
        if (tagsCode == null) {
            return true;
        }
        if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
            return true;
        }
        return subscriptionData.getCodeSet().contains(tagsCode.intValue());
    }
}

如果tag过滤时*,那么直接返回true,否则检查consumer的tag是否和消息的tag匹配,匹配返回true否则过滤掉。

流程大致如下图:

四、总结

  • 消息生产者发送带tag的消息,先存储到commitlog,然后定时分发到topic对应的consumerQueue,消息对应的entry有8位存储tag的hashcode值。
  • 消费端启动时将订阅关系通过心跳方式发送到broker,broker存储到ConsumerFilterManager中。
  • 不论是push还是pull模式,本质上都是consumer去broker拉取消息,只不过对于push模式来说,通过pull将消息拉取到本地队列,并触发本地消费逻辑。
  • 消息过滤逻辑是在broker实现,从consumerQueue拉取消息的时候,触发过滤逻辑,将符合条件的tag消息拉到本地消费。

另外有几个问题读者可以自己思考下,答案可以写到评论区或者私信:

1.为什么消息过滤逻辑在broker实现,而没有在consumer实现?

2.SQL92表达式过滤如何实现?

3.为什么无论是pull还是push模式,对于consumer来说都是基于pull来实现,为什么要这么做?

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-09-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 PersistentCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 内容目录
  • 一、背景
  • 二、使用
  • 三、原理源码分析
    • 1.tag消息的发送和存储
      • 2.tag消息的过滤
      • 四、总结
      相关产品与服务
      对象存储
      对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档