Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RocketMQ 消费者启动流程

RocketMQ 消费者启动流程

作者头像
王小明_HIT
发布于 2022-04-26 13:24:03
发布于 2022-04-26 13:24:03
83340
代码可运行
举报
文章被收录于专栏:程序员奇点程序员奇点
运行总次数:0
代码可运行

MQ 使用场景: 应用解耦、消峰填谷,消息分发。

Consumer

RocketMQ Consumer 分为 Pull Consumer 和 Push Consumer ,其实就是推拉消费者。

  • Pull Consumer
  • Push Consumer

DefaultMQPushConsumer

DefaultMQPushCOnsumerImpl 通过 start 方法启动

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer

DefaultMQPushConsumer#start 启动代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public void start() throws MQClientException {
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        this.defaultMQPushConsumerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

先简化说一下启动流程:

启动步骤如下, 启动准备,从nameserver 获取 topic 路由信息,检查 Consumer 配置,向每个 broker 发心跳,触发一次 rebalance.

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public synchronized void start() throws MQClientException {
  // 1. 启动准备工作
  switch (this.serviceState) {...}
  // 2. 从 NameServer 更新 topic 路由信息
  this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  // 3. 检查 consumer 配置
        this.mQClientFactory.checkClientInBroker();
  // 4. 向每个broker 发送心跳
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  // 5. 立即触发一次 reblance
        this.mQClientFactory.rebalanceImmediately();
        }

启动准备工作中有 CREATE_JUST状态判断。CREATE_JUST 的意思是 Service just created,not start 创建服务,没有启动。

start 启动核心代码在 DefaultMQPushConsumerImpl#start

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;

// 1. 检查必要的参数consumerGroup、消费模式、consumerFromWhere、负载策略等配置
                this.checkConfig();
// 2. 拷贝订阅信息,监听重投队列%RETRY%TOPIC
                this.copySubscription();

                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
// 3. 获取MQ 实例,先从缓存中国取,没有则创建
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 4. 设置 重负载的消费组
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                //设置消费模式
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                // 设置负载策略
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 实例化消息拉取的包装类并注册消息过滤的消息类
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 选择对应的 offset 实现类,并加载 offset 集群,广播消息的 offset 从消费者本地获取,集群模式从 Brocker 维护。
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
// 启动消息消费者服务
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();
// 8. 启动 MQ ClinentInstance
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
//9. 从nameserver拉取 topic 订阅消息
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
  //10. 向 broker 校验客户端 
        this.mQClientFactory.checkClientInBroker();
    // 11. 给所有的 broker发送心跳。
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 12. 负载队列
        this.mQClientFactory.rebalanceImmediately();
    }

咱们细化下启动过程

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1. 检查必要的参数consumerGroup、消费模式、consumerFromWhere、负载策略等配置
2. 拷贝订阅信息,监听重投队列%RETRY%TOPIC
3. 获取MQ实例,先从缓存中取,没有则创建
4. 设置重负载的消费组、消费模式、负载策略等
5. 实例化消息拉取的包装类并注册消息过滤的钩子
选择对应的OffsetStore实现类,并加载offset。广播消息从本地获取offset (Consumer本地维护),6. 集群消息从远程获取 (broker端维护)
7. 启动消息消费服务
7.1 并发消费:15分钟执行一次,过期消息(15分钟还未被消费的消息)一次最多清理16条过期消息,将过期消息发回给broker
7.2 顺序消费:20s定时锁定当前实例消费的所有队列,上锁成功将ProcessQueue的lock属性设置为true
8. 启动MQClientInstance
8.1 启动请求和响应的通道
8.2 开启定时任务
8.2.1 定时30s拉取最新的broker和topic的路由信息
8.2.2.定时30s向broker发送心跳包
8.2.3 定时5s持久化consumer的offset
8.2.4 定时1分钟,动态调整线程池线程数量
8.3 启动消息拉取服务
8.4 每20s重新负载一次
9. 从nameserver拉取topic的订阅信息
10. 向broker校验客户端
11. 给所有的broker的master发送心跳包 、上传filterClass的源文件给FilterServer
12. 立即负载队列

内容比较多,主要关心如下几点:

拷贝订阅消息

构建主体订阅消息 SubscriptionData 并加入到 RebalanceImpl 的订阅消息中,订阅关系主要来自两个:

  1. 通过调用 DefaltMQPushConsumerImpl#subscribe 方法
  2. 订阅主题消息,RocketMQ 消息重试以消费组为单位,而不是主题,消息重试主题为 %RETRY%+消费组. 消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制

    private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

初始化消息进度

其实说的就是加载 offset , 如果消息消费是集群模式,那么消费进度(offset)保存在 Broker 上;如果是广播方式,消息进度存储在消费端。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    case CLUSTERING:
        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
        break;
    default:
        break;
    }
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);

创建消费者线程服务

根据消费者是否顺序消费, 创建消费端消费线程服务。consumeMessageService 负责消息消费,内部维护的是个线程池。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制

                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

向MQClientInstance 注册消费者,并启动

向 MQClientInstance 注册消费者,并启动MQClientInstance。在一个 JVM 中所有的生产者和消费者都持有同一个 MQClientInstance, MQClientInstance, 只会启动一次。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
        null);
}

mQClientFactory.start();

总结

介绍了一下 MQ 消费者启动过程,校验配置,获取订阅消息,设置消费者的消费模式,负载策略。加载消息进度,启动消息消费者服务,并向MQClientInstance注册消费者Group和并启动MQClientInstance,从 nameserver 拉取 topic 订阅信息,向 Broker 发送心跳包。

参考资料

  • https://rocketmq.apache.org/dowloading/releases/
  • git clone https://github.com/apache/rocketmq.git
  • https://blog.csdn.net/qq_38082304/article/details/113483066
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
4 条评论
热度
最新
可以分享一下各类情感词典嘛???求!
可以分享一下各类情感词典嘛???求!
回复回复2举报
textprocessing库可以分享一下吗?
textprocessing库可以分享一下吗?
回复回复点赞举报
想问一下为什么最后得分结果都是0啊
想问一下为什么最后得分结果都是0啊
回复回复点赞举报
最后三个词典是什么呀请问
最后三个词典是什么呀请问
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
有赞延迟队列设计
延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列呢?
后端技术探索
2018/08/09
1K0
Spring Boot + Redis 实现延时队列,写得太好了!
来源:blog.csdn.net/qq330983778/article/details/99341671
乔戈里
2021/10/11
1.4K0
Redis 延时队列的简单实现 (基于有赞的设计)
> 公众号:[Java小咖秀](https://t.1yb.co/jwkk),网站:[javaxks.com](https://www.javaxks.com)
Java小咖秀
2021/04/01
4.3K0
Redis 延时队列的简单实现 (基于有赞的设计)
redis基于zset实现延迟队列
一、延迟队列使用场景二、zset如何实现延迟队列三、springboot基于zset实现延迟队列四、做成服务化五、使用zset实现延迟队列的缺点六、其他实现方式
叔牙
2023/08/09
3.1K0
redis基于zset实现延迟队列
基于Redis 实现一个轻量级的延迟队列
想实现一个轻量级的延迟队列,此时可以考虑基于Redis来实现,如果当前的基础设施不是阿里云Mq,开源的RocketMQ只有18个等级,1ms~2h的18个等级。当然商业版的阿里云可以实现精度的延迟。
路行的亚洲
2022/11/16
5640
基于Redis 实现一个轻量级的延迟队列
一款实用延迟队列的自研历程
一款技术产品必定有其使用场景,不然代码写的再好也没有用武之地,那么首先我们要先来了解一下,在什么情况下会用到延迟队列呢?
小程故事多
2018/12/26
9490
基于Redis实现DelayQueue延迟队列设计方案
有想进滴滴LogI开源用户群的加我个人微信: jjdlmn_ 进群(备注:进群) 群里面主要交流 kakfa、es、agent、LogI-kafka-manager、等等相关技术; 群内有专人解答你的问题 对~ 相关技术领域的解答人员都有; 你问的问题都会得到回应
石臻臻的杂货铺[同名公众号]
2021/07/14
4.5K0
redis 队列_Redis之延迟队列的实现
延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么场景下我才需要这样的队列呢?
Java架构师必看
2021/08/23
6190
Redis 如何实现延时任务队列
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
Tinywan
2024/05/11
7950
Redis 如何实现延时任务队列
Redis 延迟队列实现(基于PHP)
Redis 可以利用 zset (有序列表)来实现,将消息序列化成一个字符串作为 zset的 value; 这个消息的到期处理时间作为 score,利用多个线程轮询 zset 获取到期的任务进行处理。 多线程是为了保证可用性,万一挂了一个线程还有其他线程可以继续处理; 因为有多个线程,所以需要考虑并发争抢任务,确保任务不会多次执行。
高久峰
2023/09/18
4350
Redis 延迟队列实现(基于PHP)
方案设计:基于库表分段扫描和数据Redis预热,优化分布式延迟任务触达时效性
哈哈哈,说好的不卷了,能凑活用就行了。但每次接到新需求时都手痒,想结合着上一次的架构设计和落地经验,在这一次需求上在迭代更新,或者找到完全颠覆之前的更优方案。卷完代码的那一刻总是神清气爽
小傅哥
2022/03/28
7290
方案设计:基于库表分段扫描和数据Redis预热,优化分布式延迟任务触达时效性
Redis 延迟队列实现(基于PHP)
Redis 可以利用 zset (有序列表)来实现,将消息序列化成一个字符串作为 zset的 value; 这个消息的到期处理时间作为 score,利用多个线程轮询 zset 获取到期的任务进行处理。 多线程是为了保证可用性,万一挂了一个线程还有其他线程可以继续处理; 因为有多个线程,所以需要考虑并发争抢任务,确保任务不会多次执行。
陈大剩博客
2023/03/06
9460
Redis 延迟队列实现(基于PHP)
基于Redis实现延时队列服务
点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java 2021 超神之路,很肝~ 中文详细注释的开源项目 RPC 框架 Dubbo 源码解析 网络应用框架 Netty 源码解析 消息中间件 RocketMQ 源码解析 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析 作业调度中间件 Elastic-Job 源码解析 分布式事务中间件 TCC-Transaction
芋道源码
2022/03/04
4370
贼好用,冰河开源了这款精准定时任务和延时队列框架!!
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
冰河
2020/11/25
6680
贼好用,冰河开源了这款精准定时任务和延时队列框架!!
基于redis,redisson的延迟队列实践
首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了。这个需求如果不是准时通知,而是每天定点通知就简单了。如果需要准时通知就只能上延迟队列了。使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等。
kl博主
2023/11/18
4090
如何手写一个消息队列和延迟消息队列?
第一次听到“消息队列”这个词时,不知你是不是和我反应一样,感觉很高阶很厉害的样子,其实当我们了解了消息队列之后,发现它与普通的技术类似,当我们熟悉之后,也能很快地上手并使用。
小熊学Java
2023/12/27
3150
如何手写一个消息队列和延迟消息队列?
PHP实现think-queue介绍
think-queue是ThinkPHP官方提供的一个消息队列服务,是专门支持队列服务的扩展包。think-queue消息队列适用于大并发或返回结果时间比较长且需要批量操作的第三方接口,可用于短信发送、邮件发送、APP推送。think-queue消息队列可进行发布、获取、执行、删除、重发、失败处理、延迟执行、超时控制等操作。
OwenZhang
2021/12/08
2.1K0
PHP实现think-queue介绍
Thinkphp-queue自带的队列包使用分析
当前笔记中的内容针对的是 thinkphp-queue 的 v1.1.2 版本,现在官方已经更新到了 v1.1.3 版本, 下文中提到的几个Bug在最新的master分支上均已修复。笔记中的部分内容还未更新。
程序猿的栖息地
2022/04/29
2.2K0
Thinkphp-queue自带的队列包使用分析
从 0 到 1 优雅的实现PHP多进程管理
_ | | _ __ __ _ _ __ _ _| |_ ___ | '_ \ / _` | '__| | | | __/ _ \ | | | | (_| | | | |_| | || (_) | |_| |_|\__,_|_| \__,_|\__\___/ .TIGERB.cn An object-oriented multi process manager for PHP Version: 0.1.0 业务
前端教程
2018/03/05
1.5K0
从 0 到 1 优雅的实现PHP多进程管理
从0到1优雅的实现PHP多进程管理
在我们实际的业务场景中(PHP技术栈),我们可能需要定时或者近乎实时的执行一些业务逻辑,简单的我们可以使用unix系统自带的crontab实现定时任务,但是对于一些实时性要求比较高的业务就不适用了,所以我们就需要一个常驻内存的任务管理工具,为了保证实时性,一方面我们让它一直执行任务(适当的睡眠,保证cpu不被100%占用),另一方面我们实现多进程保证并发的执行任务。
用户1093396
2020/10/29
6110
从0到1优雅的实现PHP多进程管理
推荐阅读
相关推荐
有赞延迟队列设计
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验