前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ(五):揭秘高吞吐量并发消费原理

RocketMQ(五):揭秘高吞吐量并发消费原理

原创
作者头像
菜菜的后端私房菜
发布2024-10-23 10:30:57
730
发布2024-10-23 10:30:57
举报
文章被收录于专栏:消息中间件

RocketMQ(五):揭秘高吞吐量并发消费原理

趁着1024程序员节,我们一起加足「码」力,揭秘RocketMQ中高吞吐量的并发消费原理

思维导图如下:

上篇文章已经描述过拉取消息的流程,消息被拉取到消费者后就可以开始进行消费消息

消费消息分为两种方式:并发消费、顺序消费

  1. 并发消费采用多线程进行消费,能够大大提升消费吞吐量
  2. 顺序消费会根据消息的有序性进行消费,吞吐量不如并发

消费者消费流程

ConsumeMessageConcurrentlyService 消费消息

上篇文章说到,拉取完消息后会提交消费请求,便于后续进行异步消费

代码语言:java
复制
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest

submitConsumeRequest 方法有并发、顺序两种实现,先来查看并发实现

ConsumeMessageConcurrentlyService.submitConsumeRequest

并发的实现主要会根据每次批量消费的最大数量进行构建请求并提交,如果期间失败会延时5s再提交

代码语言:java
复制
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    //每次批量消费最大数量 默认为1 (消息充足的情况下,默认每次拉取32条消息)
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    //如果拉取到的消息小于等于每次批量消费数量 则构建请求并提交
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            //失败 延迟5s再提交请求
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        //如果拉取到的消息 大于 每次批量消费数量,则分批次构建请求并提交
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

并发、顺序实现中,构建的请求ConsumeRequest 并不是共享的,而是它们的内部类,虽然同名但是实现是不同的

提交后使用消费线程池执行,在执行任务的过程中,主要会调用消费监听器进行消费消息 consumeMessage,然后通过成功/失败的情况进行处理结果processConsumeResult

(在此期间还会封装上下文,执行消费前、后的钩子方法,记录消费耗时等操作)

代码语言:java
复制
public void run() {
    //检查processQueue
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }

    //获取并发消息监听器 (我们设置消费者时实现的)
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    //上下文 用于执行消费过程中的钩子方法
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    //状态
    ConsumeConcurrentlyStatus status = null;
    //失败重试的情况下会更新topic
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

    //保留上下文信息 执行消费前的钩子方法时使用
    ConsumeMessageContext consumeMessageContext = null;
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
        consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
        consumeMessageContext.setProps(new HashMap<String, String>());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
    }

    //计时
    long beginTimestamp = System.currentTimeMillis();
    //记录是否异常
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        //每个消息记录开始时间
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        //消费消息
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        hasException = true;
    }
    //消费耗时
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
    
    //计算返回类型
    if (null == status) {
        if (hasException) {
            returnType = ConsumeReturnType.EXCEPTION;
        } else {
            returnType = ConsumeReturnType.RETURNNULL;
        }
    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
        returnType = ConsumeReturnType.TIME_OUT;
    } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
        returnType = ConsumeReturnType.FAILED;
    } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
        returnType = ConsumeReturnType.SUCCESS;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
    }

    if (null == status) {
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    //消费后的钩子
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
    }

    //增加消费时间 用于运营统计
    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
        .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

    if (!processQueue.isDropped()) {
        //处理消费结果
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

processConsumeResult 根据状态处理消费结果,分为两个状态:CONSUME_SUCCESS(成功)或RECONSUME_LATER(失败重试)和两种模式:广播、集群模式

无论成功还是失败都会统计对应的数据

如果集群模式下失败,会调用 sendMessageBack 向Broker发送消息,将消息放入重试队列中,到期后进行重试;如果发送失败则延时5S重新进行消费

最终会移除ProcessorQueue中的消息并获取偏移量进行更新

代码语言:java
复制
public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    //默认int最大
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    switch (status) {
        case CONSUME_SUCCESS:
            //成功的情况下 ackIndex为本次消费数量-1
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            //统计数据
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            //失败的情况下ackIndex为-1
            ackIndex = -1;
            //统计数据
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            //如果能进入循环说明本次消费失败 广播模式 只打印日志 后续会调用本地的修改偏移量 相当于舍弃/删除 不处理
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            //如果能进入循环说明本次消费失败
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                //向broker发送消息 重试消息要放入对应的重试队列中
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    //发送失败记录
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }
            
			//发送失败延时消费
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    //删除本次消费消息,获取偏移量
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        //更新偏移量
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

广播模式使用 LocalFileOffsetStore 更新偏移量,集群模式使用 RemoteBrokerOffsetStore 更新偏移量

它们都是在内存更新偏移量,但RemoteBrokerOffsetStore会定期向Broker进行更新消费偏移量

代码语言:java
复制
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
    if (mq != null) {
        //mq旧偏移量
        AtomicLong offsetOld = this.offsetTable.get(mq);
        if (null == offsetOld) {
            offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
        }

        if (null != offsetOld) {
            //递增 CAS替换
            if (increaseOnly) {
                MixAll.compareAndIncreaseOnly(offsetOld, offset);
            } else {
                offsetOld.set(offset);
            }
        }
    }
}

顺序模式消费消息流程类似但加锁较为复杂,后文再详细说明

总结一下并发消费流程:

  1. 拉取到消息后,回调中还会提交消费请求submitConsumerRequest
  2. 根据最大消费消息数量,将本次拉取的消息进行分批次构建请求ConsumerRequest并提交到线程池执行
  3. 执行ConsumerRequest任务主要调用消息监听器进行消费消息(这里的逻辑是我们实现如何消费消息的,并返回状态),并通过返回的状态处理消费结果
  4. 集群模式下消费失败会向Broker发送重试请求,如果发送失败会延时再次提交消费请求进行重新消费
  5. 如果消费成功,从ProcessorQueue中移除消息并更新内存中Broker的消费偏移量(此时还没有向Broker提交更新消费偏移量的请求)
定时更新消费偏移量

并发消费消息只是修改内存中Broker的消费偏移量

真正更新消费偏移量的是MQClientInstance启动时的定时任务每10s调用persistAllConsumerOffset向Broker更新当前节点所有消费者的消费偏移量

代码语言:java
复制
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

persistAllConsumerOffset会遍历消费者进行持久化消费偏移量

代码语言:java
复制
private void persistAllConsumerOffset() {
    //遍历消费者
    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, MQConsumerInner> entry = it.next();
        MQConsumerInner impl = entry.getValue();
        //持久化消费偏移量
        impl.persistConsumerOffset();
    }
}

DefaultMQPushConsumerImpl.persistConsumerOffset

persistConsumerOffset会根据再平衡组件RebalanceImpl获取当前消费者负责消费的队列,再调用this.offsetStore.persistAll进行后续持久化

(再平衡组件通过负载算法决定消费者负责消费哪些队列,后续文章再讲解再平衡机制)

代码语言:java
复制
public void persistConsumerOffset() {
    try {
        this.makeSureStateOK();
        Set<MessageQueue> mqs = new HashSet<MessageQueue>();
        //获取负责消费的队列
        Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
        //不破坏原容器
        mqs.addAll(allocateMq);
        this.offsetStore.persistAll(mqs);
    } catch (Exception e) {
        log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
    }
}

RemoteBrokerOffsetStore.persistAll

遍历每个队列调用updateConsumeOffsetToBroker向Broker更新消费偏移量,如果有队列未使用则从offsetTable中移除

代码语言:java
复制
public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty())
        return;

    final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
	
    //遍历消费偏移量表 Key为队列 Value为偏移量
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        MessageQueue mq = entry.getKey();
        AtomicLong offset = entry.getValue();
        if (offset != null) {
            if (mqs.contains(mq)) {
                try {
                    //向Broker进行更新消费偏移量
                    this.updateConsumeOffsetToBroker(mq, offset.get());
                    log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                        this.groupName,
                        this.mQClientFactory.getClientId(),
                        mq,
                        offset.get());
                } catch (Exception e) {
                    log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                }
            } else {
                //记录未使用的队列
                unusedMQ.add(mq);
            }
        }
    }

    //未使用的队列进行移除
    if (!unusedMQ.isEmpty()) {
        for (MessageQueue mq : unusedMQ) {
            this.offsetTable.remove(mq);
            log.info("remove unused mq, {}, {}", mq, this.groupName);
        }
    }
}

RemoteBrokerOffsetStore.updateConsumeOffsetToBroker

定时任务调用的,第三个参数isOneway默认为true,这就说明是异步发送请求,无需关心是否发送/响应成功

在更新Broker前还需要获取Broker信息(本地内存未获取到就从NameServer获取,再存入本地内存)、封装请求,再通过RPC请求Broker

代码语言:java
复制
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    //获取Broker信息    
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
    }

    if (findBrokerResult != null) {
        //封装请求
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);

        if (isOneway) {
            //异步
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            //同步阻塞
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

总的来说,定时更新就是遍历消费者的每个队列,向Broker提交异步更新每个队列消费偏移量的请求

定时向Broker更新消费偏移量
定时向Broker更新消费偏移量

梳理完消费者消费消息的流程,再来看下Broker在该流程中需要处理的两个请求:消费失败的请求和消费成功更新消费偏移量的请求

Broker处理流程

Broker处理更新消费偏移量请求

更新消费偏移量的请求码为UPDATE_CONSUMER_OFFSET,上篇文章在讲拉取消息时向Broker读取消费偏移量的请求码为QUERY_CONSUMER_OFFSET

处理读写消费偏移量请求的都是相同组件ConsumerManageProcessor

读写消费偏移量实际上都是对Broker内存管理偏移量的ConsumerOffsetManager进行读写它的双层MapoffsetTable,其中Key为Topic@消费者组Value则为队列ID与消费偏移量的映射

代码语言:java
复制
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
    //根据Key获取 队列和消费偏移量映射
    ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
    if (null == map) {
        //没有就初始化更新完放入
        map = new ConcurrentHashMap<Integer, Long>(32);
        map.put(queueId, offset);
        this.offsetTable.put(key, map);
    } else {
        //内存更新
        Long storeOffset = map.put(queueId, offset);
        if (storeOffset != null && offset < storeOffset) {
            log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
        }
    }
}

Broker端的更新消费偏移量也是内存级别的操作,真正持久化也是定时刷盘的任务进行的(初始化延迟10S,后续5S定时刷盘)

代码语言:java
复制
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
    try {
        BrokerController.this.consumerOffsetManager.persist();
    } catch (Throwable e) {
        log.error("schedule persist consumerOffset error.", e);
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

异步刷盘的文件为consumerOffset.json,内容为JSON格式的双层MapoffsetTable,第一层为topic@消费者组名,第二层Key为队列ID,VALUE为消费偏移量(如下JSON文件所示)

代码语言:java
复制
{
	"offsetTable":{
		"%RETRY%warn_consumer_group@warn_consumer_group":{0:2
		},
		"%RETRY%order_consumer_group@order_consumer_group":{0:0
		},
		"TopicTest@please_rename_unique_group_name_5":{0:273,1:269,2:270,3:271,4:270,5:270,6:273,7:272
		},
		"TopicTest@order_consumer_group":{0:727914,1:727908,2:727913,3:727916,4:727910,5:727911,6:727918,7:727966
		},
		"TopicTest@warn_consumer_group":{0:727914,1:727908,2:727913,3:727916,4:727910,5:727911,6:727918,7:727966
		}
	}
}

Broker使用ConsumerManagerProcessor负责处理消费相关请求,并使用管理消费偏移量的ConsumerOffsetManager根据topic、消费者组、队列id、消费偏移量等信息,对offsetTable进行更新消费偏移量,后续定时将offsetTable持久化为consumerOffset的JSON文件

Broker处理更新消费偏移量
Broker处理更新消费偏移量
Broker处理消费失败的请求

集群模式下,消费失败会向Broker发送请求码为CONSUMER_SEND_MSG_BACK的消息

处理该消息的Processor与处理生产者发送消息的相同,都是SendMessageProcessor

它会调用asyncConsumerSendMsgBack处理消费失败的请求

处理时判断消息Topic是重试还是死信,然后再调用持久化消息asyncPutMessage的流程

(最终投入消息的流程也是调用之前持久化消息说过的asyncPutMessage,只是投入前判断是放入哪个队列中)

不理解重试、死信等概念的同学可能不太懂这段源码,我们先来介绍下:

消息消费失败后会放入重试队列进行重试,其中无序消息重试消费的时间间隔会递增

当重试达到一定次数后认为“永远”无法消费,会将消息放入死信队列,放入死信队列可以让开发人员便于排查多次无法消费的原因

代码语言:java
复制
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
                                                                    RemotingCommand request) throws RemotingCommandException {
    //...

    
    //重试队列topic = %RETRY%+消费者组名
    String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
    int topicSysFlag = 0;
    //...
    
    
    //根据便宜量查出原来存储的消息
    MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
    
    //...
    
	//如果重试次数超过最大重试次数 默认16次  或 延迟等级为负数 则要放入死信队列
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
        || delayLevel < 0) {
        //死信队列topic =  %DLQ% + 消费者组名
        newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
        queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE | PermName.PERM_READ, 0);
        //如果要放入死信 会设置延时等级为0
        msgExt.setDelayTimeLevel(0);
    } else {
        //重试的情况下 延时等级 = 3 + 消息重试次数
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        //设置延时等级
        msgExt.setDelayTimeLevel(delayLevel);
    }

    //封装消息 异步存储
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(newTopic);
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    //...

    CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    return putMessageResult.thenApply(r -> {
        //...
    });
}

重试Topic用%RETRY%与消费者组名进行拼接,用于消息的重试,并且只有一个队列用于存储需要重试的消息,那么它是如何做到不同时间间隔的消息到期后就进行重试的呢?

其实该流程中不仅会用到重试队列、死信队列,还会用到延时队列

当确认使用重试而不是死信队列时,会设置延时等级msgExt.setDelayTimeLevel(delayLevel),使用死信时延时等级设置为0 msgExt.setDelayTimeLevel(0)

这个延时等级后续持久化消息前会进行判断,如果设置的延时等级大于0,说明需要使用延时队列

每个级别的队列对应的延时时间不同,以此来实现等待一定的时间,还会将重试topic和队列id存入消息properties,延时到期后会将消息存入重试队列中,重试队列再被拉取消息进行重复消息

代码语言:java
复制
//消息设置过延时等级
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }

    //topic换成延时topic
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    //根据延时等级获取对应的延时队列 第一次延时等级设置的是3,这个方法会减1,也就是第一次会被放入延时队列id为2
    int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // Backup real topic, queueId
    //将重试的topic、队列ID存入消息的properties,后续从延时队列出来重新存储时要使用
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    //设置成延时topic和队列
    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

第一次重试设置的延时等级为3,调用delayLevel2QueueId会将延时等级减1,也就是第一次进入延时的队列ID为2

延时队列ID范围为2-17,分别对应16个延时级别:

第几次重试

与上次重试的间隔时间

第几次重试

与上次重试的间隔时间

1

10秒

9

7分钟

2

30秒

10

8分钟

3

1分钟

11

9分钟

4

2分钟

12

10分钟

5

3分钟

13

20分钟

6

4分钟

14

30分钟

7

5分钟

15

1小时

8

6分钟

16

2小时

延时队列的原理与实现细节也很多,放在后文再进行解析

Broker处理消费重试请求,实际上就是判断是否超过最大重试次数,如果超过放入死信队列、未超过放入重试队列并设置延时级别

而在CommitLog追加数据前,会判断是否设置延时级别,如果设置过要更改消息topic、queueid等信息,将其投入延时队列中(这里只是写CommitLog,写延时队列的ConsumerQueue是异步重投的,前文已经说过)

等到延时后消息从延时队列出来被投入重试队列中,后续继续被拉取消费(延时队列的实现原理后文描述)

(图片原稿弄丢了,根据之前的图片贴上来补画的流程图,将就看~)

Broker处理消费重试
Broker处理消费重试

你可能会有疑问,拉取消息需要通过PullRequest,而每个PullRequest对应一个队列,那么是谁把重试队列对应的PullRequest加入拉取消息的流程呢?

这也是再平衡机制进行处理的,后续的文章再来分析再平衡机制是如何为每个消费者分配队列的

总结

提交消费请求后,会根据每次消费批处理最大消息数量进行分批次构建消费请求并提交到线程池执行任务

并发消费消息的特点是吞吐量大,使用线程池对拉取的消息进行消费,但是消费消息是无法预估执行顺序

消费消息时会使用消费者的消费监听器进行消费消息并获取返回状态,根据状态进行后续的处理(集群模式下)

如果状态为成功则删除ProcessQueue中的消息,并更新内存中记录Broker的消费偏移量,后续定时任务向Broker进行更新该消费者所有队列对应的消费偏移量

Broker更新队列的消费偏移量时,实际上也是在内存更新ConsumerOffsetManager的offsettable记录的消费偏移量,后续定时将其持久化到consumerOffset.json文件

如果状态为失败,则会向Broker发送消费重试的同步请求,如果请求超时或未发出去,则再次延时提交该消费请求,后续重新消费

Broker收到消费重试请求后,相当于又要进行持久化,只是期间会改变消息topic、队列等信息,根据重试次数判断是否超时最大重试次数,如果超时则将消息topic、队列等数据改为死信的,未超过则将消息的数据改为重试的,并设置延时级别

在CommitLog追加数据前,会判断消息是否设置过延时级别,如果设置过则又会将消息的topic、队列等数据改为延时的,并保存之前重试队列的数据,持久化消息后,异步写ComsumerQueue,相当于消息被投入延时队列中,等到延时时间结束后,消息会被投入重试队列

消费者的再平衡机制会将这个重试队列对应的PullRequest请求加入,后续再进行拉取消息并进行消费,以此达成消费重试机制

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ(五):揭秘高吞吐量并发消费原理
    • 消费者消费流程
      • ConsumeMessageConcurrentlyService 消费消息
      • 定时更新消费偏移量
    • Broker处理流程
      • Broker处理更新消费偏移量请求
      • Broker处理消费失败的请求
    • 总结
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档