Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RocketMQ 消费者启动流程

RocketMQ 消费者启动流程

作者头像
王小明_HIT
发布于 2022-04-26 13:24:03
发布于 2022-04-26 13:24:03
83300
代码可运行
举报
文章被收录于专栏:程序员奇点程序员奇点
运行总次数:0
代码可运行

MQ 使用场景: 应用解耦、消峰填谷,消息分发。

Consumer

RocketMQ Consumer 分为 Pull Consumer 和 Push Consumer ,其实就是推拉消费者。

  • Pull Consumer
  • Push Consumer

DefaultMQPushConsumer

DefaultMQPushCOnsumerImpl 通过 start 方法启动

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer

DefaultMQPushConsumer#start 启动代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

先简化说一下启动流程:

启动步骤如下, 启动准备,从nameserver 获取 topic 路由信息,检查 Consumer 配置,向每个 broker 发心跳,触发一次 rebalance.

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public synchronized void start() throws MQClientException {
  // 1. 启动准备工作
  switch (this.serviceState) {...}
  // 2. 从 NameServer 更新 topic 路由信息
  this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  // 3. 检查 consumer 配置
        this.mQClientFactory.checkClientInBroker();
  // 4. 向每个broker 发送心跳
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  // 5. 立即触发一次 reblance
        this.mQClientFactory.rebalanceImmediately();
        }

启动准备工作中有 CREATE_JUST状态判断。CREATE_JUST 的意思是 Service just created,not start 创建服务,没有启动。

start 启动核心代码在 DefaultMQPushConsumerImpl#start

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;

// 1. 检查必要的参数consumerGroup、消费模式、consumerFromWhere、负载策略等配置
                this.checkConfig();
// 2. 拷贝订阅信息,监听重投队列%RETRY%TOPIC
                this.copySubscription();

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
// 3. 获取MQ 实例,先从缓存中国取,没有则创建
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 4. 设置 重负载的消费组
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                //设置消费模式
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                // 设置负载策略
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 实例化消息拉取的包装类并注册消息过滤的消息类
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 选择对应的 offset 实现类,并加载 offset 集群,广播消息的 offset 从消费者本地获取,集群模式从 Brocker 维护。
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
// 启动消息消费者服务
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();
// 8. 启动 MQ ClinentInstance
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
//9. 从nameserver拉取 topic 订阅消息
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  //10. 向 broker 校验客户端 
        this.mQClientFactory.checkClientInBroker();
    // 11. 给所有的 broker发送心跳。
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 12. 负载队列
        this.mQClientFactory.rebalanceImmediately();
    }

咱们细化下启动过程

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1. 检查必要的参数consumerGroup、消费模式、consumerFromWhere、负载策略等配置
2. 拷贝订阅信息,监听重投队列%RETRY%TOPIC
3. 获取MQ实例,先从缓存中取,没有则创建
4. 设置重负载的消费组、消费模式、负载策略等
5. 实例化消息拉取的包装类并注册消息过滤的钩子
选择对应的OffsetStore实现类,并加载offset。广播消息从本地获取offset (Consumer本地维护),6. 集群消息从远程获取 (broker端维护)
7. 启动消息消费服务
7.1 并发消费:15分钟执行一次,过期消息(15分钟还未被消费的消息)一次最多清理16条过期消息,将过期消息发回给broker
7.2 顺序消费:20s定时锁定当前实例消费的所有队列,上锁成功将ProcessQueue的lock属性设置为true
8. 启动MQClientInstance
8.1 启动请求和响应的通道
8.2 开启定时任务
8.2.1 定时30s拉取最新的broker和topic的路由信息
8.2.2.定时30s向broker发送心跳包
8.2.3 定时5s持久化consumer的offset
8.2.4 定时1分钟,动态调整线程池线程数量
8.3 启动消息拉取服务
8.4 每20s重新负载一次
9. 从nameserver拉取topic的订阅信息
10. 向broker校验客户端
11. 给所有的broker的master发送心跳包 、上传filterClass的源文件给FilterServer
12. 立即负载队列

内容比较多,主要关心如下几点:

拷贝订阅消息

构建主体订阅消息 SubscriptionData 并加入到 RebalanceImpl 的订阅消息中,订阅关系主要来自两个:

  1. 通过调用 DefaltMQPushConsumerImpl#subscribe 方法
  2. 订阅主题消息,RocketMQ 消息重试以消费组为单位,而不是主题,消息重试主题为 %RETRY%+消费组. 消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制

    private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

初始化消息进度

其实说的就是加载 offset , 如果消息消费是集群模式,那么消费进度(offset)保存在 Broker 上;如果是广播方式,消息进度存储在消费端。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
    }
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

创建消费者线程服务

根据消费者是否顺序消费, 创建消费端消费线程服务。consumeMessageService 负责消息消费,内部维护的是个线程池。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制

                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

向MQClientInstance 注册消费者,并启动

向 MQClientInstance 注册消费者,并启动MQClientInstance。在一个 JVM 中所有的生产者和消费者都持有同一个 MQClientInstance, MQClientInstance, 只会启动一次。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}

mQClientFactory.start();

总结

介绍了一下 MQ 消费者启动过程,校验配置,获取订阅消息,设置消费者的消费模式,负载策略。加载消息进度,启动消息消费者服务,并向MQClientInstance注册消费者Group和并启动MQClientInstance,从 nameserver 拉取 topic 订阅信息,向 Broker 发送心跳包。

参考资料

  • https://rocketmq.apache.org/dowloading/releases/
  • git clone https://github.com/apache/rocketmq.git
  • https://blog.csdn.net/qq_38082304/article/details/113483066
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
[Java]快速构建一个CLI小工具
在现实开发的过程中,大家会发现很多开源的框架都会有着自己的一个CLI工具库来帮助开发者们通过命令行的方式快速的达到某些目的,比如常见的docker 命令。那么在这篇文章当中,主要给大家介绍一个golang的小框架,我们可以借助这个框架来快速搭建一个小的CLI工具。
宇宙无敌暴龙战士之心悦大王
2023/03/29
1.4K0
Go通过cobra快速构建命令行应用
来自jetbrains Go 语言现状调查报告 显示:在go开发者中使用go开发实用小程序的比例为31%仅次于web,go得益于跨平台、无依赖的特性,用来编写命令行或系统管理这类小程序非常不错。
你大哥
2022/06/08
5140
Go通过cobra快速构建命令行应用
Go: 常用工具库cobra的简介与实践
来自jetbrains Go 语言现状调查报告 显示:在go开发者中使用go开发实用小程序的比例为31%仅次于web,go得益于跨平台、无依赖的特性,用来编写命令行或系统管理这类小程序非常不错。
Freedom123
2024/03/29
6390
Go: 常用工具库cobra的简介与实践
Cobra 使用简要(万字带你轻松上手 Cobra 使用)
欢迎阅读本文,本文将介绍如何使用 Go 语言中的 Cobra 库快速实现一个强大的命令行客户端。命令行客户端在软件开发中扮演着重要的角色,它们提供了一种简单而直接的方式来与应用程序进行交互,使用户能够轻松地执行各种操作。而 Cobra 则是一款流行的开源库,专门用于简化命令行应用程序的开发。
繁依Fanyi
2024/03/30
4.5K0
Cobra 使用简要(万字带你轻松上手 Cobra 使用)
Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler
0. 前言 继续上一篇博客阅读 Kubernetes 源码,参照《k8s 源码阅读》首先学习 Kubernetes 的一些核心组件,首先是 kube-scheduler 本文严重参考原文:《k8s 源码阅读》之 2.2 章节:scheduler,加入部分自己阅读的体会作为自己的阅读笔记 感谢《k8s 源码阅读》的作者们辛苦编写教材,在此郑重表示感谢,望大家多多支持!~ 1. 整体设计 1.1 概述 官网描述: The Kubernetes scheduler runs as a process alo
西凉风雷
2022/11/23
3560
Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler
命令行工具开发 cobra 示例
Cobra 是 Go 语言中一个流行的库,用于创建命令行应用程序。它提供了一个易于使用的框架,帮助开发者快速构建强大且灵活的 CLI(Command-Line Interface)工具。Cobra 的主要特点包括命令层次结构、命令行标志(flags)和参数处理、自动生成帮助文档等。
孟斯特
2024/08/10
1790
命令行工具开发 cobra 示例
cobra框架初探
cobra是一个命令行程序库,可以用来编写命令行程序,非常简单易用,类似于Go标准库中的Flag包,不过它比Flag包强大很多。它提供了一个脚手架,可以生成基于cobra的应用程序框架。
数据小冰
2022/08/15
9850
cobra框架初探
Go之现代命令行框架Cobra
老实说,今天是我第一次见到现代命令行框架这个名词,在此之前,我并不知道这个东西的作用是什么。下面一起来了解一下这个东西。
f1sh
2024/07/27
2700
Go每日一库之5:cobra
cobra是一个命令行程序库,可以用来编写命令行程序。同时,它也提供了一个脚手架,
luckpunk
2023/09/13
4480
要命!我篡改了系统命令惊现事故,竟要扣我年终奖-Golang-cobra
Kubernetes、Hugo、etcd 这些知名项目都用cobra来做命令行程序。学起来!
机智的程序员小熊
2023/03/02
3150
要命!我篡改了系统命令惊现事故,竟要扣我年终奖-Golang-cobra
Cobra 库上手—自建命令行工具
Cobra 是一个流行的 Go 语言库,用于创建强大且灵活的命令行应用程序。它由 spf13 开发,设计用于与 Go 生态系统中的其他流行库(如 Viper 配置库)无缝集成。Cobra 支持多级命令结构,允许定义根命令和任意数量的子命令,还可以轻松处理全局和本地标志。它自动生成帮助和使用信息,并支持 Bash、Zsh、Fish 和 PowerShell 的命令补全。此外,Cobra 能够生成 Markdown 格式的文档,使文档维护更加便捷。通过与 Viper 集成,Cobra 能处理配置文件和环境变量,为开发者提供了强大的工具集,使创建复杂的 Client 工具变得简单高效。Cobra 广泛应用于各种 Go 项目中,提升了 Client 应用的开发体验和维护效率。
FunTester
2025/01/23
1630
Cobra 库上手—自建命令行工具
go命令行库-cobra
Cobra 是一个用于创建强大的现代 CLI 应用程序的库。几乎包含了你所需要的所有元素。
zy010101
2022/10/31
9450
cobra-强大的CLI应用程序库
Cobra是一个用于创建强大的现代CLI应用程序的库,也是一个用于生成应用程序和命令文件的程序。
happlyfox
2021/02/24
7650
cobra-强大的CLI应用程序库
【K8s源码品读】002:Phase 1 - kubectl - create的调用逻辑
我们的目标是查看kubectl create -f nginx_pod.yaml 这个命令是怎么运行的。
junedayday
2021/08/05
5400
DevOpsCamp第2期:从 《cobra - 06 持久化命令》 开始聊聊 Go语言 指针类型的使用注意事项
嗯, 在 cobra 中提供了一种叫做 Persistent 的 状态, 定向支持 函数 与 参数。
老麦
2023/02/25
2960
DevOpsCamp第2期:从 《cobra - 06 持久化命令》 开始聊聊 Go语言 指针类型的使用注意事项
怎样上手cobra
cobra是go语言中一个非常强大的命令行构建工具,我们非常熟悉的docker、k8s、etcd都是基于cobra开发的。如果你想打造自己的命令行工具,那么cobra就是你的最佳选择。
闻说社
2024/06/11
1280
怎样上手cobra
golang 源码分析(17):cobra docker
然后创建DockerCli对象,DockerCli对象在cli/cli.go里声明。
golangLeetcode
2022/08/02
5260
使用 Cobra 创建 CLI 应用
虽然现在我们使用的大多数软件都是可视化的,很容易上手,但是这并不代表 CLI(命令行)应用就没有用武之地了,特别是对于开发人员来说,还是会经常和 CLI 应用打交道。而 Golang 就非常适合用来构建 CLI 应用,下面我们就将来介绍如何在 Golang 中构建一个 CLI 应用。
我是阳明
2020/06/15
1.5K0
go cobra CLI工具库的简单入门
下载:go install github.com/spf13/cobra-cli@latest
用户8478947
2023/01/26
1.1K0
Go 每日一库之 cli
cli是一个用于构建命令行程序的库。我们之前也介绍过一个用于构建命令行程序的库cobra。在功能上来说两者差不多,cobra的优势是提供了一个脚手架,方便开发。cli非常简洁,所有的初始化操作就是创建一个cli.App结构的对象。通过为对象的字段赋值来添加相应的功能。
用户7731323
2020/09/08
1.5K0
推荐阅读
相关推荐
[Java]快速构建一个CLI小工具
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验