Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊rocketmq的DLedgerRoleChangeHandler

聊聊rocketmq的DLedgerRoleChangeHandler

原创
作者头像
code4it
修改于 2020-01-02 04:07:35
修改于 2020-01-02 04:07:35
42500
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下rocketmq的DLedgerRoleChangeHandler

DLedgerRoleChangeHandler

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLegerRoleChangeHandler_"));
    private BrokerController brokerController;
    private DefaultMessageStore messageStore;
    private DLedgerCommitLog dLedgerCommitLog;
    private DLedgerServer dLegerServer;
    public DLedgerRoleChangeHandler(BrokerController brokerController, DefaultMessageStore messageStore) {
        this.brokerController = brokerController;
        this.messageStore = messageStore;
        this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
        this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
    }
​
    @Override public void handle(long term, MemberState.Role role) {
        Runnable runnable = new Runnable() {
            @Override public void run() {
                long start = System.currentTimeMillis();
                try {
                    boolean succ = true;
                    log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
                    switch (role) {
                        case CANDIDATE:
                            if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
                                brokerController.changeToSlave(dLedgerCommitLog.getId());
                            }
                            break;
                        case FOLLOWER:
                            brokerController.changeToSlave(dLedgerCommitLog.getId());
                            break;
                        case LEADER:
                            while (true) {
                                if (!dLegerServer.getMemberState().isLeader()) {
                                    succ = false;
                                    break;
                                }
                                if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
                                    break;
                                }
                                if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
                                    && messageStore.dispatchBehindBytes() == 0) {
                                    break;
                                }
                                Thread.sleep(100);
                            }
                            if (succ) {
                                messageStore.recoverTopicQueueTable();
                                brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
                            }
                            break;
                        default:
                            break;
                    }
                    log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
                } catch (Throwable t) {
                    log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
                }
            }
        };
        executorService.submit(runnable);
    }
​
    @Override public void startup() {}
​
    @Override public void shutdown() {
        executorService.shutdown();
    }
}
  • DLedgerRoleChangeHandler实现了DLedgerLeaderElector.RoleChangeHandler接口,其handle方法会往executorService提交一个runnable;其shutdown方法会执行executorService.shutdown();runnable方法会根据MemberState.Role做不同处理,在role为CANDIDATE且messageStore.getMessageStoreConfig().getBrokerRole()不为SLAVE的时候会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为FOLLOWER时会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为LEADER时执行messageStore.recoverTopicQueueTable()及brokerController.changeToMaster(BrokerRole.SYNC_MASTER)

changeToSlave

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class BrokerController {
    
    //......public void changeToSlave(int brokerId) {
        log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);//change the role
        brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
        messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);//handle the scheduled service
        try {
            this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
        } catch (Throwable t) {
            log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
        }//handle the transactional service
        try {
            this.shutdownProcessorByHa();
        } catch (Throwable t) {
            log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
        }//handle the slave synchronise
        handleSlaveSynchronize(BrokerRole.SLAVE);try {
            this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
        } catch (Throwable ignored) {}
        log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
    }//......
}
  • changeToSlave方法主要执行messageStoreConfig.setBrokerRole(BrokerRole.SLAVE),然后执行shutdownProcessorByHa()、handleSlaveSynchronize(BrokerRole.SLAVE)以及registerBrokerAll(true, true, brokerConfig.isForceRegister())

changeToMaster

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class BrokerController {
    
    //......public void changeToMaster(BrokerRole role) {
        if (role == BrokerRole.SLAVE) {
            return;
        }
        log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());//handle the slave synchronise
        handleSlaveSynchronize(role);//handle the scheduled service
        try {
            this.messageStore.handleScheduleMessageService(role);
        } catch (Throwable t) {
            log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
        }//handle the transactional service
        try {
            this.startProcessorByHa(BrokerRole.SYNC_MASTER);
        } catch (Throwable t) {
            log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
        }//if the operations above are totally successful, we change to master
        brokerConfig.setBrokerId(0); //TO DO check
        messageStoreConfig.setBrokerRole(role);try {
            this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
        } catch (Throwable ignored) {}
        log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
    }//......
}
  • changeToMaster方法执行handleSlaveSynchronize、startProcessorByHa(BrokerRole.SYNC_MASTER)以及registerBrokerAll(true, true, brokerConfig.isForceRegister())

小结

DLedgerRoleChangeHandler实现了DLedgerLeaderElector.RoleChangeHandler接口,其handle方法会往executorService提交一个runnable;其shutdown方法会执行executorService.shutdown();runnable方法会根据MemberState.Role做不同处理,在role为CANDIDATE且messageStore.getMessageStoreConfig().getBrokerRole()不为SLAVE的时候会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为FOLLOWER时会执行brokerController.changeToSlave(dLedgerCommitLog.getId());在role为LEADER时执行messageStore.recoverTopicQueueTable()及brokerController.changeToMaster(BrokerRole.SYNC_MASTER)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
源码分析 RocketMQ DLedger 多副本即主从切换实现原理
DLedger 基于 raft 协议,故天然支持主从切换,即主节点(Leader)发生故障,会重新触发选主,在集群内再选举出新的主节点。
丁威
2019/10/08
1.6K1
源码分析 RocketMQ DLedger 多副本即主从切换实现原理
RocketMq之Broker源码分析
服务器上部署的RocketMq进程一般称之为Broker,Broker会接收Producer的消息,持久化到本地,然后push给Consumer,通常使用集群部署,主从之间会有数据同步。
周同学
2020/06/16
9750
RocketMq之Broker源码分析
RocketMQ原理—2.源码设计简单分析二
现已知Broker作为一个JVM进程启动后,会由BrokerStartup这个启动组件先初始化4个配置组件,然后再通过这4个配置组件创建BrokerController这个管理控制组件,在BrokerController管控组件中会包含大量的核心功能组件和后台线程池。如下图示:
东阳马生架构
2025/04/02
620
聊聊rocketmq的SlaveSynchronize
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
code4it
2019/12/30
4610
聊聊rocketmq的SlaveSynchronize
Broker注册到NameServer源码分析
RocketMQ一个用Java写的开源项目,而且也是阿里开源的,所以想看一看设计思路以及一些细节,所以就写了这篇博客,记录一下Broker注册到Nameserver的过程以及心跳逻辑。
CBeann
2023/12/25
2090
Broker注册到NameServer源码分析
RocketMQ源码分析之路由中心
早期的rocketmq版本的路由功能是使用zookeeper实现的,后来rocketmq为了追求性能,自己实现了一个性能更高效且实现简单的路由中心NameServer,而且可以通过部署多个路由节点实现高可用,但它们之间并不能互相通信,这也就会导致在某一个时刻各个路由节点间的数据并不完全相同,但数据某个时刻不一致并不会导致消息发送不了,这也是rocketmq追求简单高效的一个做法。
张乘辉
2019/06/14
4820
RocketMQ学习-Broker-1
前面学习了name server的主要代码,这篇文章开始学习broker的源码。broker是RocketMQ的核心模块,这篇文章我们先从整体看下代码结构、RocketMQ的领域模型,然后再看下一个broker节点的启动过程以及Controller暴露的接口。
阿杜
2018/08/06
6500
RocketMQ学习-Broker-1
RocketMQ源码系列(一) NameServer 核心源码解析
NameServer 是rocketmq核心组件之一,与zookeeper一样天生具有分布式的特性,在rocketmq中担当着路由注册、发现、动态地维护broker相关信息的角色, NameServer 不提供Master-slave同步机制,但是能够保证数据的最终一致性。
IT大咖说
2021/07/19
6540
Rocketmq学习一
1.由于rocketmq需要依赖nameServer,类似于zookeeper。首先启动时,配置好NamesrvStartup的环境变量信息,也即rocketmq的ROCKEMQ_HOME与你的项目对应。接着就可以启动了。
路行的亚洲
2020/07/16
4980
RocketMQ源码解析-topic创建机制
RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建。可以通过设置broker的配置文件来禁用或者允许自动创建。默认是开启的允许自动创建
程序猿川子
2022/08/19
1.6K0
【RocketMq】RocketMq-NameServ 源码分析(Ver4.9.4)
RocketMq3.X的版本和Kafka一样是基于Zookeeper进行路由管理的,但是这意味着运维需要多部署一套Zookeeper集群,后来RocketMq选择去ZK最终出现了NameServ。NameServ作为RocketMq源码阅读的切入点非常不错,本文将会介绍Ver 4.9.4 版本的NameServ源码分析。
阿东
2022/11/29
5630
【RocketMq】RocketMq-NameServ 源码分析(Ver4.9.4)
RocketMQ路由中心NameServer
消息中间件的设计思路一般是基于主题订阅发布的机制,消息生产者(Producer)发送某一个主题到消息服务器,消息服务器负责将消息持久化存储,消息消费者(Consumer)订阅该兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉去(Pull模式),从而实现消息生产者与消息消费者解耦。为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发送到哪台消息服务器呢?如果某一台消息服务器宕机了,那么消息生产者如何在不重启服务情况下感知呢?
周杰伦本人
2022/10/25
5600
RocketMQ路由中心NameServer
RocketMQ Topic创建【源码笔记】
Topic的创建分为自动创建和通过命令行创建两种。通过broker配置参数autoCreateTopicEnable设置。 通常可以在非生产环境开启自动创建,生产环境待审批后再进行创建。
瓜农老梁
2019/08/27
4.3K0
RocketMQ 大脑 NameServer 赏析
NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现,主要功能如下:
IT技术小咖
2023/11/17
4510
RocketMQ 大脑 NameServer 赏析
RocketMQ 源码分析 —— 高可用
本文主要解析 Namesrv、Broker 如何实现高可用,Producer、Consumer 怎么与它们通信保证高可用。
芋道源码
2020/05/19
9810
RocketMQ主从如何同步消息消费进度?
如果消费者消费模式不同,也会有不同的保存方式,消费者端的消息消费进度保存到 OffsetStore 中,他有两个实现类:
张乘辉
2019/09/25
1.2K0
聊聊rocketmq的suggestPullingFromSlave
rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
code4it
2019/12/05
7070
聊聊rocketmq的suggestPullingFromSlave
聊聊rocketmq的pullFromWhichNodeTable
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
code4it
2019/12/04
4860
聊聊rocketmq的pullFromWhichNodeTable
聊聊rocketmq的SERVICE_NOT_AVAILABLE
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
code4it
2019/12/18
1.7K0
聊聊rocketmq的SERVICE_NOT_AVAILABLE
RocketMQ(三):server端处理框架及消费数据查找实现
 rocketmq作为一个高性能的消息中间件,咱们光停留在使用层面,总感觉缺点什么。虽然rocketmq的官方设计文档讲得还是比较详细的,但纸上得来终觉浅!今天我们就来亲自挖一挖rocketmq的实现细节:server端处理框架以及如果进行消费消息。
huofo
2022/03/17
5800
RocketMQ(三):server端处理框架及消费数据查找实现
相关推荐
源码分析 RocketMQ DLedger 多副本即主从切换实现原理
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档