Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊rocketmq-proxy的popMessage

聊聊rocketmq-proxy的popMessage

原创
作者头像
code4it
发布于 2024-08-13 01:13:13
发布于 2024-08-13 01:13:13
12400
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下rocketmq-proxy的popMessage

MessagingProcessor

org/apache/rocketmq/proxy/processor/MessagingProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface MessagingProcessor extends StartAndShutdown {

	//......

    CompletableFuture<PopResult> popMessage(
        ProxyContext ctx,
        QueueSelector queueSelector,
        String consumerGroup,
        String topic,
        int maxMsgNums,
        long invisibleTime,
        long pollTime,
        int initMode,
        SubscriptionData subscriptionData,
        boolean fifo,
        PopMessageResultFilter popMessageResultFilter,
        String attemptId,
        long timeoutMillis
    );

	//......
}

MessagingProcessor接口定义了popMessage方法

DefaultMessagingProcessor

org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DefaultMessagingProcessor extends AbstractStartAndShutdown implements MessagingProcessor {

	//......

    public CompletableFuture<PopResult> popMessage(
        ProxyContext ctx,
        QueueSelector queueSelector,
        String consumerGroup,
        String topic,
        int maxMsgNums,
        long invisibleTime,
        long pollTime,
        int initMode,
        SubscriptionData subscriptionData,
        boolean fifo,
        PopMessageResultFilter popMessageResultFilter,
        String attemptId,
        long timeoutMillis
    ) {
        return this.consumerProcessor.popMessage(ctx, queueSelector, consumerGroup, topic, maxMsgNums,
            invisibleTime, pollTime, initMode, subscriptionData, fifo, popMessageResultFilter, attemptId, timeoutMillis);
    }

	//......
}

DefaultMessagingProcessor的popMessage方法委托给了consumerProcessor.popMessage

ConsumerProcessor

org/apache/rocketmq/proxy/processor/ConsumerProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public CompletableFuture<PopResult> popMessage(
        ProxyContext ctx,
        QueueSelector queueSelector,
        String consumerGroup,
        String topic,
        int maxMsgNums,
        long invisibleTime,
        long pollTime,
        int initMode,
        SubscriptionData subscriptionData,
        boolean fifo,
        PopMessageResultFilter popMessageResultFilter,
        String attemptId,
        long timeoutMillis
    ) {
        CompletableFuture<PopResult> future = new CompletableFuture<>();
        try {
            AddressableMessageQueue messageQueue = queueSelector.select(ctx, this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic));
            if (messageQueue == null) {
                throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no readable queue");
            }
            return popMessage(ctx, messageQueue, consumerGroup, topic, maxMsgNums, invisibleTime, pollTime, initMode,
                subscriptionData, fifo, popMessageResultFilter, attemptId, timeoutMillis);
        }  catch (Throwable t) {
            future.completeExceptionally(t);
        }
        return future;
    }

	public CompletableFuture<PopResult> popMessage(
        ProxyContext ctx,
        AddressableMessageQueue messageQueue,
        String consumerGroup,
        String topic,
        int maxMsgNums,
        long invisibleTime,
        long pollTime,
        int initMode,
        SubscriptionData subscriptionData,
        boolean fifo,
        PopMessageResultFilter popMessageResultFilter,
        String attemptId,
        long timeoutMillis
    ) {
        CompletableFuture<PopResult> future = new CompletableFuture<>();
        try {
            if (maxMsgNums > ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST) {
                log.warn("change maxNums from {} to {} for pop request, with info: topic:{}, group:{}",
                    maxMsgNums, ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST, topic, consumerGroup);
                maxMsgNums = ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST;
            }

            PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
            requestHeader.setConsumerGroup(consumerGroup);
            requestHeader.setTopic(topic);
            requestHeader.setQueueId(messageQueue.getQueueId());
            requestHeader.setMaxMsgNums(maxMsgNums);
            requestHeader.setInvisibleTime(invisibleTime);
            requestHeader.setPollTime(pollTime);
            requestHeader.setInitMode(initMode);
            requestHeader.setExpType(subscriptionData.getExpressionType());
            requestHeader.setExp(subscriptionData.getSubString());
            requestHeader.setOrder(fifo);
            requestHeader.setAttemptId(attemptId);

            future = this.serviceManager.getMessageService().popMessage(
                    ctx,
                    messageQueue,
                    requestHeader,
                    timeoutMillis)
                .thenApplyAsync(popResult -> {
                    if (PopStatus.FOUND.equals(popResult.getPopStatus()) &&
                        popResult.getMsgFoundList() != null &&
                        !popResult.getMsgFoundList().isEmpty() &&
                        popMessageResultFilter != null) {

                        List<MessageExt> messageExtList = new ArrayList<>();
                        for (MessageExt messageExt : popResult.getMsgFoundList()) {
                            try {
                                fillUniqIDIfNeed(messageExt);
                                String handleString = createHandle(messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getCommitLogOffset());
                                if (handleString == null) {
                                    log.error("[BUG] pop message from broker but handle is empty. requestHeader:{}, msg:{}", requestHeader, messageExt);
                                    messageExtList.add(messageExt);
                                    continue;
                                }
                                MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK, handleString);

                                PopMessageResultFilter.FilterResult filterResult =
                                    popMessageResultFilter.filterMessage(ctx, consumerGroup, subscriptionData, messageExt);
                                switch (filterResult) {
                                    case NO_MATCH:
                                        this.messagingProcessor.ackMessage(
                                            ctx,
                                            ReceiptHandle.decode(handleString),
                                            messageExt.getMsgId(),
                                            consumerGroup,
                                            topic,
                                            MessagingProcessor.DEFAULT_TIMEOUT_MILLS);
                                        break;
                                    case TO_DLQ:
                                        this.messagingProcessor.forwardMessageToDeadLetterQueue(
                                            ctx,
                                            ReceiptHandle.decode(handleString),
                                            messageExt.getMsgId(),
                                            consumerGroup,
                                            topic,
                                            MessagingProcessor.DEFAULT_TIMEOUT_MILLS);
                                        break;
                                    case MATCH:
                                    default:
                                        messageExtList.add(messageExt);
                                        break;
                                }
                            } catch (Throwable t) {
                                log.error("process filterMessage failed. requestHeader:{}, msg:{}", requestHeader, messageExt, t);
                                messageExtList.add(messageExt);
                            }
                        }
                        popResult.setMsgFoundList(messageExtList);
                    }
                    return popResult;
                }, this.executor);
        } catch (Throwable t) {
            future.completeExceptionally(t);
        }
        return FutureUtils.addExecutor(future, this.executor);
    }    

ConsumerProcessor的popMessage方法先通过queueSelector.select选择messageQueue,之后再执行popMessage;这里限制了每次pop消息的数量不能超过MAX_MSG_NUMS_FOR_POP_REQUEST(32),构建PopMessageRequestHeader之后再委托给了this.serviceManager.getMessageService().popMessage,之后返回popResult,拉取到的消息放到了msgFoundList属性

MessageService

org/apache/rocketmq/proxy/service/message/MessageService.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    CompletableFuture<PopResult> popMessage(
        ProxyContext ctx,
        AddressableMessageQueue messageQueue,
        PopMessageRequestHeader requestHeader,
        long timeoutMillis
    );

MessageService接口定义了popMessage方法,它有两个实现类,分别是LocalMessageService

LocalMessageService

org/apache/rocketmq/proxy/service/message/LocalMessageService.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
        PopMessageRequestHeader requestHeader, long timeoutMillis) {
        requestHeader.setBornTime(System.currentTimeMillis());
        RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage());
        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
        SimpleChannel channel = channelManager.createInvocationChannel(ctx);
        InvocationContext invocationContext = new InvocationContext(future);
        channel.registerInvocationContext(request.getOpaque(), invocationContext);
        ChannelHandlerContext simpleChannelHandlerContext = channel.getChannelHandlerContext();
        try {
            RemotingCommand response = brokerController.getPopMessageProcessor().processRequest(simpleChannelHandlerContext, request);
            if (response != null) {
                invocationContext.handle(response);
                channel.eraseInvocationContext(request.getOpaque());
            }
        } catch (Exception e) {
            future.completeExceptionally(e);
            channel.eraseInvocationContext(request.getOpaque());
            log.error("Failed to process popMessage command", e);
        }
        return future.thenApply(r -> {
            PopStatus popStatus;
            List<MessageExt> messageExtList = new ArrayList<>();
            switch (r.getCode()) {
                case ResponseCode.SUCCESS:
                    popStatus = PopStatus.FOUND;
                    ByteBuffer byteBuffer = ByteBuffer.wrap(r.getBody());
                    messageExtList = MessageDecoder.decodesBatch(
                        byteBuffer,
                        true,
                        false,
                        true
                    );
                    break;
                case ResponseCode.POLLING_FULL:
                    popStatus = PopStatus.POLLING_FULL;
                    break;
                case ResponseCode.POLLING_TIMEOUT:
                case ResponseCode.PULL_NOT_FOUND:
                    popStatus = PopStatus.POLLING_NOT_FOUND;
                    break;
                default:
                    throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, r.getRemark());
            }
            PopResult popResult = new PopResult(popStatus, messageExtList);
            PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) r.readCustomHeader();

            if (popStatus == PopStatus.FOUND) {
                Map<String, Long> startOffsetInfo;
                Map<String, List<Long>> msgOffsetInfo;
                Map<String, Integer> orderCountInfo;
                popResult.setInvisibleTime(responseHeader.getInvisibleTime());
                popResult.setPopTime(responseHeader.getPopTime());
                startOffsetInfo = ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
                msgOffsetInfo = ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
                orderCountInfo = ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
                // <topicMark@queueId, msg queueOffset>
                Map<String, List<Long>> sortMap = new HashMap<>(16);
                for (MessageExt messageExt : messageExtList) {
                    // Value of POP_CK is used to determine whether it is a pop retry,
                    // cause topic could be rewritten by broker.
                    String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
                        messageExt.getProperty(MessageConst.PROPERTY_POP_CK), messageExt.getQueueId());
                    if (!sortMap.containsKey(key)) {
                        sortMap.put(key, new ArrayList<>(4));
                    }
                    sortMap.get(key).add(messageExt.getQueueOffset());
                }
                Map<String, String> map = new HashMap<>(5);
                for (MessageExt messageExt : messageExtList) {
                    if (startOffsetInfo == null) {
                        // we should set the check point info to extraInfo field , if the command is popMsg
                        // find pop ck offset
                        String key = messageExt.getTopic() + messageExt.getQueueId();
                        if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
                            map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
                                messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId()));
                        }
                        messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
                    } else {
                        if (messageExt.getProperty(MessageConst.PROPERTY_POP_CK) == null) {
                            String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
                            int index = sortMap.get(key).indexOf(messageExt.getQueueOffset());
                            Long msgQueueOffset = msgOffsetInfo.get(key).get(index);
                            if (msgQueueOffset != messageExt.getQueueOffset()) {
                                log.warn("Queue offset [{}] of msg is strange, not equal to the stored in msg, {}", msgQueueOffset, messageExt);
                            }

                            messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
                                ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
                                    responseHeader.getReviveQid(), messageExt.getTopic(), messageQueue.getBrokerName(), messageExt.getQueueId(), msgQueueOffset)
                            );
                            if (requestHeader.isOrder() && orderCountInfo != null) {
                                Integer count = orderCountInfo.get(key);
                                if (count != null && count > 0) {
                                    messageExt.setReconsumeTimes(count);
                                }
                            }
                        }
                    }
                    messageExt.getProperties().computeIfAbsent(MessageConst.PROPERTY_FIRST_POP_TIME, k -> String.valueOf(responseHeader.getPopTime()));
                    messageExt.setBrokerName(messageExt.getBrokerName());
                    messageExt.setTopic(messageQueue.getTopic());
                }
            }
            return popResult;
        });
    }

LocalMessageService的popMessage方法构建RequestCode.POP_MESSAGE命令,然后通过brokerController.getPopMessageProcessor().processRequest来拉取消息

ClusterMessageService

org/apache/rocketmq/proxy/service/message/ClusterMessageService.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
        PopMessageRequestHeader requestHeader, long timeoutMillis) {
        return this.mqClientAPIFactory.getClient().popMessageAsync(
            messageQueue.getBrokerAddr(),
            messageQueue.getBrokerName(),
            requestHeader,
            timeoutMillis
        );
    }

ClusterMessageService的popMessage方法则是委托给了mqClientAPIFactory.getClient().popMessageAsync

popMessageAsync

org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public CompletableFuture<PopResult> popMessageAsync(
        String brokerAddr,
        String brokerName,
        PopMessageRequestHeader requestHeader,
        long timeoutMillis
    ) {
        CompletableFuture<PopResult> future = new CompletableFuture<>();
        try {
            this.popMessageAsync(brokerName, brokerAddr, requestHeader, timeoutMillis, new PopCallback() {
                @Override
                public void onSuccess(PopResult popResult) {
                    future.complete(popResult);
                }

                @Override
                public void onException(Throwable t) {
                    future.completeExceptionally(t);
                }
            });
        } catch (Throwable t) {
            future.completeExceptionally(t);
        }
        return future;
    }

    public void popMessageAsync(
        final String brokerName, final String addr, final PopMessageRequestHeader requestHeader,
        final long timeoutMillis, final PopCallback popCallback
    ) throws RemotingException, InterruptedException {
        final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {

            }

            @Override
            public void operationSucceed(RemotingCommand response) {
                try {
                    PopResult popResult = MQClientAPIImpl.this.processPopResponse(brokerName, response, requestHeader.getTopic(), requestHeader);
                    popCallback.onSuccess(popResult);
                } catch (Exception e) {
                    popCallback.onException(e);
                }
            }
            @Override
            public void operationFail(Throwable throwable) {
                popCallback.onException(throwable);
            }
        });
    }    

popMessageAsync也是构建RequestCode.POP_MESSAGE命令,然后通过remotingClient去发送请求,之后再通过processPopResponse来处理返回结果

PopMessageProcessor

org/apache/rocketmq/broker/processor/PopMessageProcessor.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        final long beginTimeMills = this.brokerController.getMessageStore().now();
        request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis()));
        if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) {
            request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
        }
        Channel channel = ctx.channel();

        RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
        final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
        final PopMessageRequestHeader requestHeader =
            (PopMessageRequestHeader) request.decodeCommandCustomHeader(PopMessageRequestHeader.class, true);
        StringBuilder startOffsetInfo = new StringBuilder(64);
        StringBuilder msgOffsetInfo = new StringBuilder(64);
        StringBuilder orderCountInfo = null;
        if (requestHeader.isOrder()) {
            orderCountInfo = new StringBuilder(64);
        }

        brokerController.getConsumerManager().compensateBasicConsumerInfo(requestHeader.getConsumerGroup(),
            ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);

        response.setOpaque(request.getOpaque());

        if (brokerController.getBrokerConfig().isEnablePopLog()) {
            POP_LOGGER.info("receive PopMessage request command, {}", request);
        }

        if (requestHeader.isTimeoutTooMuch()) {
            response.setCode(ResponseCode.POLLING_TIMEOUT);
            response.setRemark(String.format("the broker[%s] pop message is timeout too much",
                this.brokerController.getBrokerConfig().getBrokerIP1()));
            return response;
        }
        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(String.format("the broker[%s] pop message is forbidden",
                this.brokerController.getBrokerConfig().getBrokerIP1()));
            return response;
        }
        if (requestHeader.getMaxMsgNums() > 32) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("the broker[%s] pop message's num is greater than 32",
                this.brokerController.getBrokerConfig().getBrokerIP1()));
            return response;
        }

        if (!brokerController.getMessageStore().getMessageStoreConfig().isTimerWheelEnable()) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("the broker[%s] pop message is forbidden because timerWheelEnable is false",
                this.brokerController.getBrokerConfig().getBrokerIP1()));
            return response;
        }

        TopicConfig topicConfig =
            this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (null == topicConfig) {
            POP_LOGGER.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(),
                RemotingHelper.parseChannelRemoteAddr(channel));
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(),
                FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
            return response;
        }

        if (!PermName.isReadable(topicConfig.getPerm())) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the topic[" + requestHeader.getTopic() + "] peeking message is forbidden");
            return response;
        }

        if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
            String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] " +
                    "consumer:[%s]",
                requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(),
                channel.remoteAddress());
            POP_LOGGER.warn(errorInfo);
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(errorInfo);
            return response;
        }
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
        if (null == subscriptionGroupConfig) {
            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
            response.setRemark(String.format("subscription group [%s] does not exist, %s",
                requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
            return response;
        }

        if (!subscriptionGroupConfig.isConsumeEnable()) {
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
            return response;
        }

        BrokerConfig brokerConfig = brokerController.getBrokerConfig();
        SubscriptionData subscriptionData = null;
        ExpressionMessageFilter messageFilter = null;
        if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) {
            try {
                subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());
                brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                    requestHeader.getTopic(), subscriptionData);

                String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
                SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, SubscriptionData.SUB_ALL, requestHeader.getExpType());
                brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                    retryTopic, retrySubscriptionData);

                ConsumerFilterData consumerFilterData = null;
                if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                    consumerFilterData = ConsumerFilterManager.build(
                        requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getExp(),
                        requestHeader.getExpType(), System.currentTimeMillis()
                    );
                    if (consumerFilterData == null) {
                        POP_LOGGER.warn("Parse the consumer's subscription[{}] failed, group: {}",
                            requestHeader.getExp(), requestHeader.getConsumerGroup());
                        response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                        response.setRemark("parse the consumer's subscription failed");
                        return response;
                    }
                }
                messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
                    brokerController.getConsumerFilterManager());
            } catch (Exception e) {
                POP_LOGGER.warn("Parse the consumer's subscription[{}] error, group: {}", requestHeader.getExp(),
                    requestHeader.getConsumerGroup());
                response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                response.setRemark("parse the consumer's subscription failed");
                return response;
            }
        } else {
            try {
                subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG);
                brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                    requestHeader.getTopic(), subscriptionData);

                String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
                SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, "*", ExpressionType.TAG);
                brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                    retryTopic, retrySubscriptionData);
            } catch (Exception e) {
                POP_LOGGER.warn("Build default subscription error, group: {}", requestHeader.getConsumerGroup());
            }
        }

        int randomQ = random.nextInt(100);
        int reviveQid;
        if (requestHeader.isOrder()) {
            reviveQid = KeyBuilder.POP_ORDER_REVIVE_QUEUE;
        } else {
            reviveQid = (int) Math.abs(ckMessageNumber.getAndIncrement() % this.brokerController.getBrokerConfig().getReviveQueueNum());
        }

        GetMessageResult getMessageResult = new GetMessageResult(requestHeader.getMaxMsgNums());
        ExpressionMessageFilter finalMessageFilter = messageFilter;
        StringBuilder finalOrderCountInfo = orderCountInfo;

        // Due to the design of the fields startOffsetInfo, msgOffsetInfo, and orderCountInfo,
        // a single POP request could only invoke the popMsgFromQueue method once
        // for either a normal topic or a retry topic's queue. Retry topics v1 and v2 are
        // considered the same type because they share the same retry flag in previous fields.
        // Therefore, needRetryV1 is designed as a subset of needRetry, and within a single request,
        // only one type of retry topic is able to call popMsgFromQueue.
        boolean needRetry = randomQ % 5 == 0;
        boolean needRetryV1 = false;
        if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
            needRetryV1 = randomQ % 2 == 0;
        }
        long popTime = System.currentTimeMillis();
        CompletableFuture<Long> getMessageFuture = CompletableFuture.completedFuture(0L);
        if (needRetry && !requestHeader.isOrder()) {
            if (needRetryV1) {
                String retryTopic = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup());
                getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel,
                    popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
            } else {
                String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
                getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel,
                    popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
            }
        }
        if (requestHeader.getQueueId() < 0) {
            // read all queue
            getMessageFuture = popMsgFromTopic(topicConfig, false, getMessageResult, requestHeader, reviveQid, channel,
                popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
        } else {
            int queueId = requestHeader.getQueueId();
            getMessageFuture = getMessageFuture.thenCompose(restNum ->
                popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), false,
                    getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter,
                    startOffsetInfo, msgOffsetInfo, finalOrderCountInfo));
        }
        // if not full , fetch retry again
        if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) {
            if (needRetryV1) {
                String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), requestHeader.getConsumerGroup());
                getMessageFuture = popMsgFromTopic(retryTopicV1, true, getMessageResult, requestHeader, reviveQid, channel,
                    popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
            } else {
                String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
                getMessageFuture = popMsgFromTopic(retryTopic, true, getMessageResult, requestHeader, reviveQid, channel,
                    popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture);
            }
        }

        final RemotingCommand finalResponse = response;
        SubscriptionData finalSubscriptionData = subscriptionData;
        getMessageFuture.thenApply(restNum -> {
            if (!getMessageResult.getMessageBufferList().isEmpty()) {
                finalResponse.setCode(ResponseCode.SUCCESS);
                getMessageResult.setStatus(GetMessageStatus.FOUND);
                if (restNum > 0) {
                    // all queue pop can not notify specified queue pop, and vice versa
                    popLongPollingService.notifyMessageArriving(
                        requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(),
                        null, 0L, null, null);
                }
            } else {
                PollingResult pollingResult = popLongPollingService.polling(
                    ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter);
                if (PollingResult.POLLING_SUC == pollingResult) {
                    return null;
                } else if (PollingResult.POLLING_FULL == pollingResult) {
                    finalResponse.setCode(ResponseCode.POLLING_FULL);
                } else {
                    finalResponse.setCode(ResponseCode.POLLING_TIMEOUT);
                }
                getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
            }
            responseHeader.setInvisibleTime(requestHeader.getInvisibleTime());
            responseHeader.setPopTime(popTime);
            responseHeader.setReviveQid(reviveQid);
            responseHeader.setRestNum(restNum);
            responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
            responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
            if (requestHeader.isOrder() && finalOrderCountInfo != null) {
                responseHeader.setOrderCountInfo(finalOrderCountInfo.toString());
            }
            finalResponse.setRemark(getMessageResult.getStatus().name());
            switch (finalResponse.getCode()) {
                case ResponseCode.SUCCESS:
                    if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                        final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(),
                            requestHeader.getTopic(), requestHeader.getQueueId());
                        this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
                            requestHeader.getTopic(), requestHeader.getQueueId(),
                            (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
                        finalResponse.setBody(r);
                    } else {
                        final GetMessageResult tmpGetMessageResult = getMessageResult;
                        try {
                            FileRegion fileRegion =
                                new ManyMessageTransfer(finalResponse.encodeHeader(getMessageResult.getBufferTotalSize()),
                                    getMessageResult);
                            channel.writeAndFlush(fileRegion)
                                .addListener((ChannelFutureListener) future -> {
                                    tmpGetMessageResult.release();
                                    Attributes attributes = RemotingMetricsManager.newAttributesBuilder()
                                        .put(LABEL_REQUEST_CODE, RemotingHelper.getRequestCodeDesc(request.getCode()))
                                        .put(LABEL_RESPONSE_CODE, RemotingHelper.getResponseCodeDesc(finalResponse.getCode()))
                                        .put(LABEL_RESULT, RemotingMetricsManager.getWriteAndFlushResult(future))
                                        .build();
                                    RemotingMetricsManager.rpcLatency.record(request.getProcessTimer().elapsed(TimeUnit.MILLISECONDS), attributes);
                                    if (!future.isSuccess()) {
                                        POP_LOGGER.error("Fail to transfer messages from page cache to {}",
                                            channel.remoteAddress(), future.cause());
                                    }
                                });
                        } catch (Throwable e) {
                            POP_LOGGER.error("Error occurred when transferring messages from page cache", e);
                            getMessageResult.release();
                        }

                        return null;
                    }
                    break;
                default:
                    return finalResponse;
            }
            return finalResponse;
        }).thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
        return null;
    }

PopMessageProcessor的processRequest方法用于处理RequestCode.POP_MESSAGE请求,它构建了GetMessageResult,之后通过popMsgFromTopic或者popMsgFromQueue去拉取消息

popMsgFromTopic

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private CompletableFuture<Long> popMsgFromTopic(TopicConfig topicConfig, boolean isRetry, GetMessageResult getMessageResult,
        PopMessageRequestHeader requestHeader, int reviveQid, Channel channel, long popTime,
        ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
        StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int randomQ, CompletableFuture<Long> getMessageFuture) {
        if (topicConfig != null) {
            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
                getMessageFuture = getMessageFuture.thenCompose(restNum ->
                    popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), isRetry,
                        getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,
                        startOffsetInfo, msgOffsetInfo, orderCountInfo));
            }
        }
        return getMessageFuture;
    }

popMsgFromTopic根据(randomQ + i) % topicConfig.getReadQueueNums()确定queueId,然后执行popMsgFromQueue

popMsgFromQueue

popMsgFromQueue针对顺序消息会执行brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())校验,需要block的话会直接返回

checkBlock

org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    public boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime) {
        String key = buildKey(topic, group);
        ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
        if (qs == null) {
            qs = new ConcurrentHashMap<>(16);
            ConcurrentHashMap<Integer/*queueId*/, OrderInfo> old = table.putIfAbsent(key, qs);
            if (old != null) {
                qs = old;
            }
        }

        OrderInfo orderInfo = qs.get(queueId);

        if (orderInfo == null) {
            return false;
        }
        return orderInfo.needBlock(attemptId, invisibleTime);
    }

        @JSONField(serialize = false, deserialize = false)
        public boolean needBlock(String attemptId, long currentInvisibleTime) {
            if (offsetList == null || offsetList.isEmpty()) {
                return false;
            }
            if (this.attemptId != null && this.attemptId.equals(attemptId)) {
                return false;
            }
            int num = offsetList.size();
            int i = 0;
            if (this.invisibleTime == null || this.invisibleTime <= 0) {
                this.invisibleTime = currentInvisibleTime;
            }
            long currentTime = System.currentTimeMillis();
            for (; i < num; i++) {
                if (isNotAck(i)) {
                    long nextVisibleTime = popTime + invisibleTime;
                    if (offsetNextVisibleTime != null) {
                        Long time = offsetNextVisibleTime.get(this.getQueueOffset(i));
                        if (time != null) {
                            nextVisibleTime = time;
                        }
                    }
                    if (currentTime < nextVisibleTime) {
                        return true;
                    }
                }
            }
            return false;
        }    

checkBlock先找到指定的orderInfo,然后执行orderInfo.needBlock(attemptId, invisibleTime);它先判断该offsetList是否都已经ack了,都ack了则返回false,否则计算nextVisibleTime,若有当前时间小于nextVisibleTime则返回true

小结

ConsumerProcessor的popMessage方法先通过queueSelector.select选择messageQueue,之后再执行popMessage;这里限制了每次pop消息的数量不能超过MAX_MSG_NUMS_FOR_POP_REQUEST(32),构建PopMessageRequestHeader之后再委托给了this.serviceManager.getMessageService().popMessage,之后返回popResult,拉取到的消息放到了msgFoundList属性。对于顺序消息会执行checkBlock判断,若该offsetList有未ack的且时间还小于nextVisibleTime则返回true。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【收藏】 100 道 Linux 面试题 附答案
本文一共 3万多字,分别从 Linux概述、磁盘、目录、文件、安全、语法级、实战、文件管理命令、文档编辑命令、磁盘管理命令、网络通讯命令、系统管理命令、备份压缩命令等方面拆解 Linux 知识点。
释然IT杂谈
2022/10/27
3.2K0
【收藏】 100 道 Linux 面试题 附答案
Linux面试题基础宝典==面试必备
另外,du 命令也可以做类似的事情,可以看看 《查找 Linux 系统中的占用磁盘空间最大的前 10 个文件或文件夹》 文章。
心跳包
2020/08/31
2.7K0
Linux面试题基础宝典==面试必备
linux运维面试题总结「建议收藏」
4、一个ext3的文件分区,当使用touch test.file命令创建一个新文件时报错,报错的信息是显示磁盘已满,但是采用df -h命令查看磁盘大小时,只使用了60%的磁盘空间,为什么会出现这个情况,说说你的理由
全栈程序员站长
2022/08/22
2.1K0
linux运维面试题总结「建议收藏」
面试必备(背)-Linux八股文系列!
Linux 是免费可自由传播的类 Unix 操作系统,是一个基于 POSIX 和 Unix 的多用户、多任务、支持多线程和多 CPU 的操作系统。
微客鸟窝
2022/04/08
4K0
面试必备(背)-Linux八股文系列!
Java面试——Linux
一、如果知道一个文件名称,怎么查这个文件在 Linux下的哪个目录,如:要查找 tnsnames.ora文件
Java架构师必看
2021/04/26
1K0
Linux面试题Top100
回答: Linux是基于Linux内核的操作系统。它是一个开源操作系统,可以在不同的硬件平台上运行。它为用户提供了免费的低成本操作系统。这是一个用户友好的环境,他们可以在其中轻松修改和创建源代码的变体。
陈哈哈
2020/07/06
14.9K0
Linux面试题Top100
2020最新版Linux面试题(二)
一般来说,面试不会问 inode 。但是 inode 是一个重要概念,是理解 Unix/Linux 文件系统和硬盘储存的基础。
码农编程进阶笔记
2021/07/20
1.8K0
运维面试题(每日一题)
默认生产环境中,三台服务器均可满足访问外网需求;但最终目标是完成服务器01与服务器03之间的不同网段间通讯,即服务器01的10.0.0.10主机IP地址可以正常访问服务器03的10.0.1.10主机IP地址
全栈程序员站长
2022/08/10
5.2K0
运维面试题(每日一题)
linux常用命令&目录结构
它主要完成的工作有:激活交换分区,检查磁盘,加载硬件模块以及其它一些需要优先执行任务。 8. 建立终端
20岁爱吃必胜客
2022/11/13
5980
linux常用命令&目录结构
Linux面试题(2021最新版)
作为 Java 的从业者,在找工作的时候,一定会被问及关于Linux 相关的知识。Linux知识的掌握程度,在很多面试官眼里是候选人技术深度的一个重要评判标准。在这里我们将详细的整理常见的Linux面试题目, 提供给大家学习参考。
Java程序猿
2021/04/19
7.4K0
Linux经典面试题
显示/etc/inittab中以#开头,且后面跟一个或多个空白字符,而后又跟了任意非空白字符的行;
老高的技术博客
2022/12/27
7220
Linux基础知识点
文件(包括目录)权限分为三类别,从左至右依次是:文件所属主的权限、文件所属所在用户组的权限和其他用户的权限。
恋喵大鲤鱼
2018/08/03
1.9K0
Linux基础知识点
面试官:用过Linux吗?
所有可操作的计算机资源都存在于目录树这个结构中,对计算资源的访问,可以看做是对这棵目录树的访问
cuijianzhe
2024/02/03
1680
面试官:用过Linux吗?
2017年企业版高薪运维经典基础面试题汇总
1.解释下什么是GPL,GNU,自由软件? GPL:(通用公共许可证):一种授权,任何人有权取得、修改、重新发布自由软件的权力。 GNU:(革奴计划):目标是创建一套完全自由、开放的的操作系统。 自由软件:是一种可以不受限制地自由使用、复制、研究、修改和分发的软件。主要许可证有GPL和BSD许可证两种。 2.如何选择Linux操作系统版本一般来讲,桌面用户首选Ubuntu;服务器首选RHEL或CentOS,两者中首选CentOS。 根据具体要求: ①安全性要求较高,则选择Debian或者FreeBS
小小科
2018/05/02
1.2K0
2017年企业版高薪运维经典基础面试题汇总
linux常见面试题
Linux是一种基于UNIX的操作系统,最初是由Linus Torvalds引入的。它基于Linux内核,可以运行在由Intel,MIPS,HP,IBM,SPARC和Motorola制造的不同硬件平台上。Linux中另一个受欢迎的元素是它的吉祥物,一个名叫Tux的企鹅形象。
心跳包
2020/08/31
2.8K0
云计算工程师之Linux指令集锦
LINUX简介与安装 一、Linux基础认知知识: 多使用者、多任务、多层次 Linux:开源、免费、安全、稳定 Linux中一切皆文件 Linux严格区分大小写
张哥编程
2024/12/13
1260
Linux 文件系统与日志分析「建议收藏」
  Linux,全称 GNU/Linux,是一套免费使用和自由传播的类 Unix 操作系统,是一个基于 POSIX 的多用户、多任务、支持多线程和多 CPU 的操作系统。伴随着互联网的发展,Linux 得到了来自全世界软件爱好者、组织、公司的支持。它除了在服务器方面保持着强劲的发展势头以外,在个人电脑、嵌入式系统上都有着长足的进步。使用者不仅可以直观地获取该操作系统的实现机制,而且可以根据自身的需要来修改完善Linux,使其最大化地适应用户的需要。   Linux 的基本思想有两点:一切都是文件;每个文件都有确定的用途。其中第一条详细来讲就是系统中的所有都归结为一个文件,包括命令、硬件和软件设备、操作系统、进程等等对于操作系统内核而言,都被视为拥有各自特性或类型的文件。至于说 Linux 是基于 Unix 的,很大程度上也是因为这两者的基本思想十分相近。
全栈程序员站长
2022/08/24
2.2K0
Linux 文件系统与日志分析「建议收藏」
Linux运维常见面试题汇总
版权声明:本文为木偶人shaon原创文章,转载请注明原文地址,非常感谢。 https://blog.csdn.net/wh211212/article/details/52856240
shaonbean
2019/05/26
4.2K0
Linux从入门到入土①(Linux概述、文件系统、VIM编辑器)
林纳斯·本纳第克特·托瓦兹(Linus Benedict Torvalds,1969年12月28日- )也就是Linux之父:
十八岁讨厌编程
2022/12/10
1.3K0
2019年常见的Linux面试题及答案解析,哪些你还不会?
Linux 面试题 1、绝对路径用什么符号表示?当前目录、上层目录用什么表示?主目录用什么表示? 切换目录用什么命令? 2、怎么查看当前进程?怎么执行退出?怎么查看当前路径? 3、怎么清屏?怎么退出当
程序员追风
2019/12/27
1.6K0
2019年常见的Linux面试题及答案解析,哪些你还不会?
相关推荐
【收藏】 100 道 Linux 面试题 附答案
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验