上回讲到,Master 的 main 方法中,创建了 RpcEnv 和 Master 的 Endpoint,紧接着就开始执行 Endpoint 的生命周期方法 onStart() 方法,今天就从这里开始。
首先创建了 Master 的 Ui,也就是我们在浏览器上看到的 Master 信息:
紧接着,定时给自己发送 CheckForWorkerTimeOut 消息:
在 Master 类中搜索 case CheckForWorkerTimeOut,可看到如下逻辑:
然后,开始创建持久化引擎和选举代理
什么是 持久化引擎,如果 Master 需要主备,并且使用 Zookeeper 作为主备信息的存储,则需要创建一个读写 Zookeeper 的组件,就称之为持久化引擎。当前这是一种抽象,具体实现可以有多种方式,可以是 Zookeeper,也可以本地文件系统,也可以是自定义的。
new 了这个对象,我们去看它的构造方法:
new ZooKeeperPersistenceEngine(conf, serializer)
首先从配置中获取存储到 Zookeeper 的主目录:
private val workingDir = conf.get(ZOOKEEPER_DIRECTORY).getOrElse("/spark") + "/master_status"
然后创建了一个 Zookeeper 的客户端(这个类就不往下点了,往下就是用 Curator 框架创建了一个 Zookeeper 客户端):
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
然后初始化了工作目录
SparkCuratorUtil.mkdir(zk, workingDir)
持久化引擎提供了读写 Zookeeper 的方法
这个方法底层就是 Curator 框架的 Api :
那么这就是持久化引擎的逻辑了,也没什么。
选举代理,就是提供了一种组件,来实现主备 Master 的选举,它使用了 Curator 框架提供的 LeaderLatch 来实现的
在 ZooKeeperLeaderElectionAgent 的构造器中,调用了 start() 方法
start() 方法中
先拿到了一个 zk 对象,然后创建了一个分布式锁:LeaderLatch,并且注册了一个监听,最后启动。
有个值得注意的点是,如果哪个节点选举成功了,监听就会回调 isLeader() 方法,没有注册成功,则回调 notLeader 方法。
在 isLeader() 方法中,调用了这个方法:
updateLeadershipStatus(true)
这个方法中,做了两件事,一个是更改了当前节点的 status 状态为 Leader
status = LeadershipStatus.LEADER
第二件事是做了选举之后的一些逻辑,点进去可以看到,给自己发送了一个 ElectedLeader 消息:
self.send(ElectedLeader)
那我们需要在 Master 类中搜索 case ElectedLeader. 看一下是如何处理这个消息的
这里需要考虑一种情况,比如上一个 Alive 状态的 Master 刚挂了,当前 Standby 的 Master 选举成为主节点,那需要从 Zookeeper 中恢复集群的一些数据到自己的内存中。所以,需要先从 Zookeeper 中拿到所有 Application、Driver、Worker 信息:
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
然后开始执行 beginRecovery() 方法,这个方法就是把从 Zookeeper 的各种数据,放到 Master 的各种内存里面:
数据都恢复完成之后,给自己发送一个 CompleteRecovery 消息
CompleteRecovery 消息主要是最后检查一下刚刚恢复出来数据的准确性,此处不细看。
至此,Master 就启动完成了。
本次我们主要阅读了 Master 启动时,如何初始化持久化引擎以及选举代理,选举成功之后,都做了些什么事情,对于 Master 启动过程有了初步的了解。