微信公众号:PersistentCoder
一、背景二、使用三、原理源码分析四、总结参考
RocketMQ消息中间件相比于其他消息中间件提供了更细粒度的消息过滤,相比于Topic做业务维度的区分,Tag,即消息标签,用于对某个Topic下的消息进行进一步分类。消息队列RocketMQ版的生产者在发送消息时,指定消息的Tag,消费者需根据已经指定的Tag来进行订阅。
以电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以如下消息为例:
这些消息会发送到名称为Trade_Topic的Topic中,被各个不同的系统所订阅,以如下系统为例:
使用过RocketMQ的小伙伴会注意到该消息组件支持Tag和Sql两种过滤模式。tag可以理解为topic的子类型,具有某一类型细分属性的集合,sql过滤模式是使用表达式实现通过消息内容的值进行过滤。
我们本篇重点围绕tag消息的发送和消费原理展开介绍,中间涉及到sql过滤的地方会简单做分析。
发送带tag的消息:
Message msg = new Message("topic",
"TagA",
"key-",
"body".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
订阅并消费带tag的消息:
consumer.subscribe("Topic","TagA");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
log.info("receive msg={}", JSON.toJSONString(msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
如果是使用sql过滤,订阅的时候把订阅关系改为:
consumer.subscribe("Topic",MessageSelector.bySql("name = 'xxx'"));
构造tag消息的时候会调用Message内部方法,把tag放到内部属性中:
public void setTags(String tags) {
this.putProperty("TAGS", tags);
}
发送消息会调用到client的内部实现DefaultMQProducer的sendKernelImpl方法:
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//省略...
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setFlag(msg.getFlag());
//设置属性信息(tag也在里边)
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
SendResult sendResult = null;
switch (communicationMode) {
//省略...
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
return sendResult;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
省略掉部分代码,中间会从Messenge中获取属性信息放入请求头,然后调用发送逻辑。
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
这里会调用netty客户端与broker服务端的通信能力,将消息内容发送到broker。
消息发送到broker服务端的时候会经过SendMessageProcessor#processRequest处理,会调用sendMessage方法:
public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader,
final TopicQueueMappingContext mappingContext,
final SendMessageCallback sendMessageCallback) throws RemotingCommandException {
//省略...
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
msgInner.setBody(body);
MessageAccessor.setProperties(msgInner, oriProps);
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
//省略...
} else {
PutMessageResult putMessageResult = null;
if (sendTransactionPrepareMessage) {
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext);
sendMessageCallback.onComplete(sendMessageContext, response);
return response;
}
}
然后调用DefaultMessageStore的asyncPutMessage方法将消息数据追加到commitLog最新文件中:
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
//省略...
long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
//省略...
return putResultFuture;
}
然后通过broker的ReputMessageService将commitLog中存储的消息不停的分发到对应topic的consumerQueue消费队列中,consumerQueue中一个 entry 占 20 字节,8字节 commitLog 物理偏移,4字节消息大小,8字节 tag 的 hashCode 值(消息过滤就会用到它)。
tag消息的过滤我们也分成两块分析,分别是订阅(过滤)关系维护和消息过滤。
订阅(过滤)关系维护
订阅关系维护是在consumer启动时和broker建立连接把自己的订阅信息元数据推送到broker端,我们从consumer的启动开始分析:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
我们先看一下定时发送心跳逻辑,订阅和过滤信息的上传也是由发送心跳实现:
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}
分别调用了发送心跳到broker和上传类过滤器(暂未使用),继续看一下发送心跳到broker实现:
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
} catch (Exception e) {
}
}
}
}
}
}
}
然后使用RemotingClient将订阅信息通过心跳包的方式发送到broker,发送订阅数据到broker流程图如下:
然后我们看一下broker如何处理和存储订阅信息的。broker启动时注册很多processor:
/**
* ClientManageProcessor
*/
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
这里注册了ClientManageProcessor,consumer发送心跳时会经由其processRequest处理:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(ctx, request);
default:
break;
}
return null;
}
最终会调用ConsumerFilterManager的register将订阅信息存储起来:
public boolean register(final String topic, final String consumerGroup, final String expression,
final String type, final long clientVersion) {
FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
if (filterDataMapByTopic == null) {
FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
filterDataMapByTopic = prev != null ? prev : temp;
}
BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
}
存储结构如下:
{
"filterDataByTopic":{
"Topic":{
"topic": String,
"groupFilterData": {
"consumerGroup":{
"consumerGroup" : String,
"topic": String,
"expression": String,
"expressionType": String,
"bornTime": long,
"deadTime": long,
"bloomFilterData":{
"bitPos": int[],
"bitNum": int
},
"clientVersion": long
}
},
....
},
....
}
}
broker接收心跳并存储订阅信息流程如下:
消息过滤
前边有分析到,Consume启动的时候会启动PullMessageService拉消息服务,它是一个独立线程,看一下业务实现:
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
接着调用DefaultMQPushConsumerImpl#pullMessage:
public void pullMessage(final PullRequest pullRequest) {
//省略...
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
}
break;
//省略
default:
break;
}
}
}
};
String subExpression = null;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {}
}
先定义了从broker拉到消息回调逻辑,然后调用PullApiWrapper#pullKernelImpl拉取消息并触发回调逻辑,将远程拉到的消息放入本地队列提供消费,consumeMessageService.submitConsumeRequest会触发本地消费逻辑,最终会调用到我们定义的MessageListener。
此时拉取消息的请求已经发送给broker,我们看一下broker端如何过滤消息的,拉取消息发送的请求code是PULL_MESSAGE,我们看到broker启动时会根据请求码注册不同的processor,PULL_MESSAGE对应的是PullMessageProcessor:
/**
* PullMessageProcessor
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
consumer发送拉取消息请求时会调用PullMessageProcessor的processRequest方法:
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
//省略...
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
//省略...
}
构造消息过滤器并获取消息,消息是从consumerQueue队列过滤并获取,过滤时会调用ExpressionMessageFilter的isMatchedByConsumeQueue方法:
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null) {
return true;
}
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
}
如果tag过滤时*,那么直接返回true,否则检查consumer的tag是否和消息的tag匹配,匹配返回true否则过滤掉。
流程大致如下图:
另外有几个问题读者可以自己思考下,答案可以写到评论区或者私信:
1.为什么消息过滤逻辑在broker实现,而没有在consumer实现?
2.SQL92表达式过滤如何实现?
3.为什么无论是pull还是push模式,对于consumer来说都是基于pull来实现,为什么要这么做?
本文分享自 PersistentCoder 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!