上一次阅读到 Master 调用 schedule() 方法,遍历 waitingApps,为每个程序决定启动多少 Executor,为每个 Executor 分配多少资源,有了这些信息之后,给 Worker 发送了一个 LaunchExecutor 消息,Worker 开始处理。
Worker 内部先判断发送消息的这个 Master 是否是 Active,如果不是就不做任何处理
然后为这个 Executor 创建本地工作文件夹,当任务执行完成后,会自动清理这个文件夹:
然后 Worker 内部创建了一个 ExecutorRunner ,把启动 Executor 这件事交给它来处理
点进去 start() 方法可以看到启动了一个线程来启动 Executor:
主要逻辑在 fetchAndRunExecutor() 方法中,这个方法中,创建了一个 ProcessBuilder,封装了一个 shell 命令,这个 shell 命令大致是:
java CoarseGrainedExecutorBackend
启动之后,给 worker 发送了一个 ExecutorStateChanged 消息
并且一直等待 Executor 退出,如果 Executor 退出后,会继续给 Worker 发送 ExecutorStateChanged 消息
CoarseGrainedExecutorBackend 指粗粒度的 Executor 的后台进程,在服务器上的进程名字就是这个,而不是 Executor。
它是一个后台服务进程,负责和 Driver、Worker 通信、启停 Executor、提交 Task。
而 Executor 做的事情就比较单纯,执行、停止 Task,返回 Task 执行结果,其他的和通信有关系的事情就交给了 Backend 来做。
也是职责分工明确的体现。
从 main 方法开始,定义了一个函数,返回值为 CoarseGrainedExecutorBackend,目的是创建一个 CoarseGrainedExecutorBackend 对象出来。
run 方法中,向 Driver 发送了一个消息,来获取 spark 的配置
然后用这个配置为 Executor 创建了SparkEnv,并且启动了 CoarseGrainedExecutorBackend
下面看下 CoarseGrainedExecutorBackend 的 onStart() 方法,给 Driver 发送了 消息,如果 Driver 的响应为 Success,则给自己发送一个 消息:
然后看 Driver 是如何处理这个消息的,来到 CoarseGrainedSchedulerBackend 这个类,搜索 case RegisterExecutor。
封装了一下 Executor 的信息,把这个信息放到自己的内存中来,就完成了处理,然后给CoarseGrainedExecutorBackend 回复一个 true 的信息
CoarseGrainedExecutorBackend 收到 true 的消息后,给自己发送了一个 RegisterExecutor 消息,看下处理:new 了一个 Executor 对象,然后给 Driver 发送一个 LaunchedExecutor 消息,表示注册 Executor 成功。
看下 Executor 的构造方法,构造方法就是大括号中所有可执行的代码,比较重要的是两段,一个是 初始化了一个提交任务的线程池:
一个是初始化了心跳发送器,每隔一段时间给 Driver 发送心跳消息。
来到 ExecutorRunner 类中,Executor 启动了之后,给 Worker 发送了 消息
看下 Worker 的处理,Worker 把这个消息发送给 Master
Master 的处理:
把这个消息也同时发送给 Driver :
然后如果有 Executor 注册上来,同时执行一下 schedule() 方法,把等待启动的 Driver 和 App 都启动和调度起来。看下 Driver 的处理(在 StandaloneAppClient 类中):
这样的话,Executor 启动流程就完了。然后 Driver 中有一个定时任务,定时把待提交的任务提交上来执行。