Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Spark系列——作业原理详解

Spark系列——作业原理详解

作者头像
solve
发布于 2019-10-30 04:49:16
发布于 2019-10-30 04:49:16
4710
举报
文章被收录于专栏:大数据技术栈大数据技术栈

前言

本篇文章主要是从作业提交到最后获取到作业结果,从源码的角度,但是不涉及源码进行的分析.其目的是读完本篇文章,你将对作业的基本流程有个清晰的认识。

1.任务提交过程

  • 首先,我们知道,一个action算子是触发一个job生成的地方,当遇见action算子,会执行sparkcontext的runjob方法,最后会交给dagSchedule的submitjob,这里会创建一个jobwaiter对象,并发送一个JobSubmitted消息进行作业任务的执行,同时 waiter.awaitResult()会等待作业执行结果的返回:成功或者失败。到这里,我们对于作业应该有个基本的认识了,那么接下来我们再来深入一点,这个作业submit之后发生了什么呢?

2.划 分 调 度 阶 段

  • spark是资源调度是粗粒度的,我们这里不讨论资源申请,当我们提交一个任务之后(此时资源应该都是在集群中申请好了),Spark首先会对我们的作业任务划分调度阶段,而这个调度阶段的划分是由 DAGScheduler 负责的,其调度是基于stage的,那么下面我们看看stage是怎么划分的。
  • 一个application中的rdd集合相互依赖形成了一个依赖树,DAGScheduler 通过其 getParentStages 方法会从最后一个finalrdd开始,判断依赖树中是否有shuffle,如果没有,就生成一个stage,如果有,调用getAncestorShuffleDepend,使用广度优先遍历整个依赖树,当遇到shuffle dependencies的时候,就会通过newOrUsedShuffleStag生成一个个stage,并划分为两个调度阶段,这样一个job也就被划分成了一个或者多个stage了。
  • 到这里我们的作业已经被划分成了一个个stage了,接下来就看看stage是怎么被提交的吧。。。

3.提 交 调 度 阶 段

  • 前面我们提到了JobSubmitted消息,那么这个消息实际上会触发 DAGScheduler 的 handleJobSubmitted 方法,首先该方法会在生成 finalStage 的同时建立起所有调度阶段的依赖关系(至于怎么建立的,我们后面慢慢深入),然后通过 fmalStage 生成一个作业实例ActiveJob,然后在submitStage(finalStage)开始提交作业。
  • 在作业提交调度阶段开始时,在 submitStage 方法中调用 getMissingParentStages 方法获取finalStage 父调度阶段,如果不存在父调度阶段,则使用 submitMissingTasks(stage) 方法提交执行;如果存在父调度阶段,则把该调度阶段存放到 waitingStages 列表中,同时递归调用 submitStage,直到找到没有父stage的stage调用 submitMissingTasks(stage)作业一次调度的入口,这样一次调度任务就发送到Excutor开始执行了。
  • 当Excutor的task执行完成时发通知消息CompleteEvent,会调用到DAGschedule的handleTaskCompletion更新状态,并且判断该task所属的stage是否所有任务都已经完成,如果完成,则扫描等待运行调度阶段列表,检查它们的父调度阶段是否存在未完成,如果不存在则表明该调度阶段准备就绪,生成实例并提交运行。(至于其中失败重试的机制不做讨论)
  • 到此,stage提交的基本情况我们已经了解,但是对于一个了解spark的人来说,我们熟悉的task还没有出现,接下来,我们就来看看stage的task的执行流程吧。

4.提 交 任 务

  • 前面我们说到提交 stage 的方法 submitStage ,该方法内部会调用到 DAGScheduler 的 submitMissingTasks 方法对每个stage 的 task 进行提交,其task生成规则如下:首先根据每个 stage 最后一个rdd的 Partition 个数拆分对应个数的 task ,这些 task 组成一个任务集 taskset 提交到 TaskScheduler 进行处理。对于 ResultStage (作业中最后的stage)生 成 ResultTask , 对 于 ShuffleMapStage 生成 ShuffleMapTask 。
  • 当 TaskScheduler 收到发送过来的任务集时,在 submitTasks 方法中(在 TaskSchedulerlmpl类中进行实现)构建一个 TaskSetManager 的实例,用于管理这个任务集的生命周期,并通过schedulableBuilder的addTaskSetManager放入系统的调度池中。然后调用 SchedulerBackend的 reviveOffers ,向 DriverEndPoint 终端点发送ReviveOffers消息,调用SchedulerBackend的makeOffers 方法,首先会获取集群中可用的 Executor ,并通过TaskSchedulerlmpl的resourceOffers 按照就近原则对进行资源的分配,并划分 PROCESS _ LOCAL、 NODE LOCAL、 NO PREF 、 RACK_LOCAL和 ANY 五个等级。然后进行launchtask 操作,把分配好资源的 task 一个个发送到 Worker 节点上的 CoarseGrainedExecutorBackend ,然后通过其内部的Executor 来执行任务。
  • 至此,我们的task算是正式提交到excutor准备执行了。

5.执 行 任 务

  • 当 CoarseGrainedExecutorBackend(excutor的守护进程) 接收到 LaunchTask 消息时,会调用 Executor 的 launchTask方法进行处理。在 Executor 的 launchTask 方法中,初始化一个 TaskRunner 来封装任务,它用于管理任务运行时的细节,再把 TaskRumier 对象放入到 ThreadPool (线程池)中去执行。在 TaskRunner 的 run 方法里,首先会对发送过来的 Task 本身以及它所依赖的 Jar 等文件的反序列,然后对反序列化的任务调用 Task 的 runTask 方法。由于 Task 本身是一个抽象类,具体的 runTask 方法是由它的两个子类 ShuffleMapTask 和 RedultTask 来实现的。
  • 对 于 ShuffleMapTask 而言,它的计算结果会写到 BlockManager 之中,最终返回给DAGScheduler 的是一个 MapStatus 对象。该对象中管理了 ShuffleMapTask 的运算结果存储到BlockManager 里的相关存储信息,而不是计算结果本身,这些存储信息将会成为下一阶段的任务需要获得的输入数据时的依据。
  • 对于 ResultTask 的 runTask 方法而言,它最终返回的是 func 函数的计算结果,这里猜测应该是rdd action算子的结果了。
  • 至此,task计算结束,下面我们看看计算的结果是怎么处理的。

6.获 取 执 行 结 果

  • 首先对于 Executor 的计算结果,会根据结果的大小有不同的策略。 (1) 生成结果大小大于1GB结果直接丢弃,该配置项可以通过 spark . driver.maxResultSize进行设置。 (2) 生成结果大小在[128 MB -200 KB,1 GB] : 如果生成的结果大于等于(128 MB -200 KB )时,会把该结果以taskld 为编号存入到 BlockManager 中,然后把该编号通过 Netty 发送给 Driver终端点,该阈值是 Netty 框架传输的最大值 spark . akka . frameSize (默认为128 MB )和 Netty 的预留空间 reservedSizeBytes (200 KB ) 差值。 (3) 生成结果大小在(0 , 128 MB -200 KB):通过 Netty 直接发送到 Driver 终端点。
  • 任务执行完毕后, ExecutorBackend 会将任务的执行结果发送给 DriverEndPoint 终端点。该终端点会转给 TaskSchedulerlmpl 的 statusUpdate 方法进行处理,并在该方法中获取结果 result ,对于不同的任务状态有不同的处理。 (1) 如果类型是 TaskState . FINISHED ,那么调用 TaskResultGetter 的 enqueueSuccessfulTask方法进行处理。 enqueueSuccessfulTask 方法的逻辑比较简单,如果是 IndirectTaskResult ,那么需要通过 sparkEnv . blockManager . getRemoteBytes ( blockld )来获取结果: ; 如果是DirectTaskResult ,那么结果就无需远程获取了。 ( 2 ) 如果类型是 TaskState.FAILED 或者 TaskState.KILLED 或者 TaskState.LOST ,调用TaskResultGetter 的 enqueueFailedTask 进行处理。对 于 TaskState.LOST,还需要将其所在的Executor 标记为 failed ,并且根据更新后的 Executor 重新调度。
  • 然后将获取的结果通过TaskSchedulerlmpl 的 handleSuccessfulTask进行处理,最后发送一个completionevnet消息最终调用DAGScheduler 的 handleTaskCompletion 方法。 (1) 对于shufflemaptask任务的结果 其实质是一个MapStatus,将其 注册到MapOutputTrackerMaster 中,下游的 stage 需要数据由其MapOutputTrackerWorker向MapOutputTrackerMaster 查找,到此也就完成ShuffleMapTask 的处理。 (2) 如果任务是 ResultTask , 判断该作业是否完成,如果完成,则标记该作业已经完成,清除作业依赖的资源并发送消息给系统监听总线告知作业执行完毕。

总结

当我们提交一个job,首先会被 DAGScheduler 通过宽窄依赖解析成一个个 stage,然后按顺序以 taskset 的形式提交 stage 给 TaskScheduler ,TaskScheduler 将 taskset 构建成 TaskSetManager 对象管理,并按照调度系统给定的策略向 Executor 提交任务,Executor 将接受的到 task 以 taskrunner 的方式执行计算出结果,并储存到 BlockManager ,然后向 TaskScheduler 返回一个记录了结果信息的MapStatus对象,并注册到 driver 端的 MapOutputTrackerMaster,然后进行下一轮的 stage 调度 (如果是ResultTask执行结果,那么数据是我们算子决定了他最后会落地在哪的)

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.04.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark内部原理之运行原理
在大数据领域,只有深挖数据科学领域,走在学术前沿,才能在底层算法和模型方面走在前面,从而占据领先地位。
smartsi
2019/08/08
1.3K0
Spark底层原理详细解析(深度好文,建议收藏)
Apache Spark是用于大规模数据处理的统一分析引擎,基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量硬件之上,形成集群。
五分钟学大数据
2021/01/29
1.3K0
Spark底层原理详细解析(深度好文,建议收藏)
Spark源码系列(四)图解作业生命周期
这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know! 我们先回顾一下这个图,Driver Program是我们写的那个程序,它的核心是Spar
岑玉海
2018/02/28
9810
Spark源码系列(四)图解作业生命周期
Spark源码系列(六)Shuffle的过程解析
Spark大会上,所有的演讲嘉宾都认为shuffle是最影响性能的地方,但是又无可奈何。之前去百度面试hadoop的时候,也被问到了这个问题,直接回答了不知道。 这篇文章主要是沿着下面几个问题来开展: 1、shuffle过程的划分? 2、shuffle的中间结果如何存储? 3、shuffle的数据如何拉取过来? Shuffle过程的划分 Spark的操作模型是基于RDD的,当调用RDD的reduceByKey、groupByKey等类似的操作的时候,就需要有shuffle了。再拿出reduceByKey这个
岑玉海
2018/02/28
1.7K0
Spark源码系列(六)Shuffle的过程解析
Spark详解05架构Architecture架构
架构 前三章从 job 的角度介绍了用户写的 program 如何一步步地被分解和执行。这一章主要从架构的角度来讨论 master,worker,driver 和 executor 之间怎么协调来完成
Albert陈凯
2018/04/08
1K0
Spark详解05架构Architecture架构
Spark源码深度解析图解
  Spark的宽依赖和窄依赖是DAGScheduler将job划分为多个Stage的重要因素,每一个宽依赖都会划分一个Stage。
挽风
2021/04/13
1.2K0
Spark源码深度解析图解
Spark详解03Job 物理执行图Job 物理执行图
Job 物理执行图 在 Overview 里我们初步介绍了 DAG 型的物理执行图,里面包含 stages 和 tasks。这一章主要解决的问题是: 给定 job 的逻辑执行图,如何生成物理执行图(也
Albert陈凯
2018/04/08
1.1K0
Spark详解03Job 物理执行图Job 物理执行图
Spark内核详解 (6) | Spark Shuffle 解析
在所有的 MapReduce 框架中, Shuffle 是连接 map 任务和 reduce 任务的桥梁. map 任务的中间输出要作为 reduce 任务的输入, 就必须经过 Shuffle, 所以 Shuffle 的性能的优劣直接决定了整个计算引擎的性能和吞吐量.
不温卜火
2020/10/28
8020
Spark内核详解 (6) | Spark Shuffle 解析
[spark] Task执行流程
在文章TaskScheduler 任务提交与调度源码解析 中介绍了Task在executor上的逻辑分配,调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[TaskDescription]],即对某个task需要在某个executor上执行的描述,仅仅是逻辑上的,还并未真正到executor上执行,本文将从源码角度解析Task是怎么被分配到executor上执行的。
UFO
2018/09/04
1.1K0
Spark内核详解 (5) | Spark的任务调度机制
在上一篇博文中我们讲解了 Spark YARN-Cluster 模式下的任务提交流程,但是我们并没有具体说明 Driver 的工作流程, Driver 线程主要是初始化 SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。
不温卜火
2020/10/28
3.9K0
Spark内核详解 (5) | Spark的任务调度机制
spark源码分析————DAGScheduler实现
DAGScheduler创建、Job提交、Stage划分、任务生成
俺也想起舞
2019/07/24
5970
深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
上一篇《深入理解Spark 2.0 (一):RDD实现及源码分析 》的5.2 Spark任务调度器我们省略过去了,这篇我们就来讲讲Spark的调度器。
小爷毛毛_卓寿杰
2019/02/13
1.1K0
深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
Spark sc.textFile(...).map(...).count() 执行完整流程
本文介绍下Spark 到底是如何运行sc.TextFile(...).map(....).count() 这种代码的,从driver端到executor端。 引子 今天正好有人在群里问到相关的问题,不过他的原始问题是: 我在RDD里面看到很多 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)),但是我找不到context是从哪里来的 另外还有pid,iter都是哪来的呢? 如果你照着源码点进去你会
用户2936994
2018/08/27
7580
Spark中的Scheduler「建议收藏」
scheduler分成两个类型。一个是TaskScheduler与事实上现,一个是DAGScheduler。
全栈程序员站长
2022/07/07
7930
深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析
在上一篇博文《深入理解Spark 2.1 Core (三):任务调度器的实现与源码分析 》TaskScheduler在发送任务给executor前的工作就全部完成了。这篇博文,我们来看看当executor计算完任务后,Spark是如何处理获取的计算结果与容错的。
小爷毛毛_卓寿杰
2019/02/13
1.1K0
深入理解Spark 2.1 Core (七):任务执行的原理与源码分析
上篇博文《深入理解Spark 2.1 Core (六):资源调度的实现与源码分析》中我们讲解了,AppClient和Executor是如何启动,如何为逻辑上与物理上的资源调度,以及分析了在Spark1.4之前逻辑上资源调度算法的bug。
小爷毛毛_卓寿杰
2019/02/13
6820
[spark] DAGScheduler 提交stage源码解析
DAGScheduler在划分完Stage后([spark] DAGScheduler划分stage源码解析 ),将会通过submitStage(finalStage)来提交stage:
UFO
2018/09/04
7310
Spark源码系列(三)作业运行过程
作业执行 上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥? 官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collect方法。 def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*)
岑玉海
2018/02/28
1.3K0
Spark源码系列(三)作业运行过程
Spark 的作业执行原理
** 作业(Job)提交后由行动操作触发作业执行,根据RDD的依赖关系构建DAG图,由DAGSheduler(面向阶段的任务调度器)解析
张凝可
2019/08/22
6590
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五) 更多资源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 时序图 https://github.com/opensourceteams/spark-scala-maven-2.4.0/blob/master/md/image/example/spark-sql-dataset/worldCount/worldCount
thinktothings
2019/01/17
1.1K0
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
推荐阅读
相关推荐
Spark内部原理之运行原理
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档