用的spark版本1.3+
1.SparkSubmit
bin/spark-submit --class xx.WordCount --master spark://ip:7077 --executor-memory 2g --total-executor-cores 4
2.WordCount
new()
3.SparkContext
该方法创建一个ActorSystem
createSparkEnv
4.SparkContext
创建Driver的运行时环境,注意这里的numDriverCores是local模式下用来执行计算的cores的个数,如果不是本地模式的话就是0
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
为driver创建一个SparkEnv
conf:SparkConf conf 是对SparkConf的复制
listenerBus 才用监听器模式维护各类事件处理
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None) ...
5.SparkEnv createDriverEnv
createDriverEnv 最终调用的是create方法创建SparkEnv
create()
6.SparkEnv createActorSyetem()
利用AkkaUtils这个工具类创建ActorSystem
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
7.AkkaUtils startServiceOnPort()
8.AkkaUtils doCreateActorSystem
9.AkkaUtils
创建ActorSystem
val actorSystem = ActorSystem(name, akkaConf)
ActorSystem = apply()
10.SparkContext createTaskScheduler()
根据提交任务时指定url创建相应的TaskScheduler,创建一个TaskScheduler
11.SparkContext new()
创建spark的StandAlone模式
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
12.SparkContext new()
创建一个SparkDeploySchedulerBackend
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
13.SparkContext initialize
调用initialize创建调度器
scheduler.initialize(backend)
14.SparkContext new()
创建一个DAGScheduler,以后用来把DAG切分成Stage
dagScheduler = new DAGScheduler(this)
15.SparkContext start()
启动taskScheduler
taskScheduler.start()