Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >TaskScheduler源码解读

TaskScheduler源码解读

原创
作者头像
幽鸿
发布于 2020-05-04 12:29:18
发布于 2020-05-04 12:29:18
50800
代码可运行
举报
运行总次数:0
代码可运行

在DAGScheduler中提交task方法入口:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
taskScheduler.submitTasks(new TaskSet(
  tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

这里调用了taskScheduler接口,我们打开TaskScheduler trait,trait在scala里就是接口,在IDEA中查看实现的类,使用快捷键:ctrl+H,或者直接使用快捷键:ctrl + alt +B查看实现

查看submitTasks方法实现:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
   //给每一个TaskSet创建一个TaskSetManager
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    //将manager进行内存缓存,manager负责对task进行跟踪管理,包括重试
    stageTaskSets(taskSet.stageAttemptId) = manager
    val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
      ts.taskSet != taskSet && !ts.isZombie
    }
    if (conflictingTaskSet) {
      throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
        s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
    }
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }
  //这里的backend就是SparkContext创建好的SparkDeploySchedulerBackend
  //这里backend负责创建AppClient,向Master注册Application
  //参见SparkContext代码createTaskScheduler方法
  backend.reviveOffers()
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
一文搞懂Spark的Task调度器(TaskScheduler)[通俗易懂]
通过之前 DAGScheduler的介绍可以 知道, DAGScheduler 将划分的一系列 Stage (每个Stage封装一个TaskSet) , 按照Stage的先后顺序依次提交给底层的TaskScheduler去执行。 下面来分析TaskScheduler接收到DAGScheduler的Stage任务 后, 是如何管理Stage (TaskSet) 的生命周期的。
全栈程序员站长
2022/11/07
1.2K0
[spark] TaskScheduler 任务提交与调度源码解析
在DAGScheduler划分为Stage并以TaskSet的形式提交给TaskScheduler后,再由TaskScheduler通过TaskSetMagager对taskSet的task进行调度与执行。
UFO
2018/09/04
1K0
深入理解Spark 2.1 Core (三):任务调度器的原理与源码分析
上一篇博文《深入理解Spark 2.1 Core (二):DAG调度器的实现与源码分析 》讲到了DAGScheduler.submitMissingTasks中最终调用了taskScheduler.submitTasks来提交任务。
小爷毛毛_卓寿杰
2019/02/13
9380
深入理解Spark 2.1 Core (三):任务调度器的原理与源码分析
Spark2.4.0源码分析之WorldCount 任务调度器(七)
Spark2.4.0源码分析之WorldCount 任务调度器(七) 更多资源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 时序图 https://github.com/opensourceteams/spark-scala-maven-2.4.0/blob/master/md/image/example/spark-sql-dataset/worldCount/worldCount.taskScheduler.jpg
thinktothings
2019/01/17
5890
Spark2.4.0源码分析之WorldCount 任务调度器(七)
Spark内核详解 (5) | Spark的任务调度机制
在上一篇博文中我们讲解了 Spark YARN-Cluster 模式下的任务提交流程,但是我们并没有具体说明 Driver 的工作流程, Driver 线程主要是初始化 SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。
不温卜火
2020/10/28
3.6K0
Spark内核详解 (5) | Spark的任务调度机制
[Spark源码剖析]Task的调度与执行源码剖析
一个Spark Application分为stage级别和task级别的调度,stage级别的调度已经用[DAGScheduler划分stage]和[DAGScheduler提交stage]两片文章进行源码层面的说明,本文将从源码层面剖析task是如何被调度和执行的。
codingforfun
2018/08/24
1.2K0
TaskScheduler_taskset -p
DAGScheduler面向我们整个Job划分出了Stage,划分了Stage是从后往前划分的,执行的时候是从前往后,每个Stage内部有一系列任务,Stage里面的任务是并行计算的,这些并行计算的任务的逻辑是完全相同的,只不过是处理的数据不同而已。DAGScheduler会以TaskSet的方式以一个DAG构造的Stage中所有的任务提交给底层调度器TaskScheduler,TaskScheduler是一个接口(做接口的好处就是跟具体的任务调度解耦合,这样Spark就可以运行在不同的资源调度模式上Standalone,yarn,mesos等)这符合面向对象中依赖抽象而不依赖具体的原则,带来了底层资源调度器的可插拔性,导致Spark可以运行在众多的资源调度器模式上。
全栈程序员站长
2022/11/10
3710
TaskScheduler_taskset -p
[spark] 数据本地化及延迟调度
Spark数据本地化即移动计算而不是移动数据,而现实又是残酷的,不是想要在数据块的地方计算就有足够的资源提供,为了让task能尽可能的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还无法启动则降低Locality Levels再尝试启动……
UFO
2018/09/04
1.2K0
Spark中的Scheduler「建议收藏」
scheduler分成两个类型。一个是TaskScheduler与事实上现,一个是DAGScheduler。
全栈程序员站长
2022/07/07
7250
深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析
在上一篇博文《深入理解Spark 2.1 Core (三):任务调度器的实现与源码分析 》TaskScheduler在发送任务给executor前的工作就全部完成了。这篇博文,我们来看看当executor计算完任务后,Spark是如何处理获取的计算结果与容错的。
小爷毛毛_卓寿杰
2019/02/13
1K0
TaskScheduler详解及源码介绍
创建TaskScheduler的源代码为SparkContext.createTaskScheduler,如下所示。该方法会根据master的配置匹配部署模式,每种部署模式中都会创建两个类(TaskSchedulerImpl、SchedulerBackend)的实例,只是TaskSchedulerImpl都相同,SchedulerBackend不同。
全栈程序员站长
2022/11/07
4740
任务调度器有哪些_本地计算机上的task scheduler
TaskScheduler可以看做任务调度的客户端,负责任务的提交,并且请求集群管理器对任务调度。TaskScheduler的类UML图如下,针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合。TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Driver、Executor通信收集Executor上分配给该应用的资源使用情况。常见的任务调度模式有以下四种:
全栈程序员站长
2022/11/10
6110
任务调度器有哪些_本地计算机上的task scheduler
[spark] DAGScheduler 提交stage源码解析
DAGScheduler在划分完Stage后([spark] DAGScheduler划分stage源码解析 ),将会通过submitStage(finalStage)来提交stage:
UFO
2018/09/04
6600
Spark详解03Job 物理执行图Job 物理执行图
Job 物理执行图 在 Overview 里我们初步介绍了 DAG 型的物理执行图,里面包含 stages 和 tasks。这一章主要解决的问题是: 给定 job 的逻辑执行图,如何生成物理执行图(也
Albert陈凯
2018/04/08
1.1K0
Spark详解03Job 物理执行图Job 物理执行图
Spark源码系列(三)作业运行过程
作业执行 上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥? 官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collect方法。 def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*)
岑玉海
2018/02/28
1.1K0
Spark源码系列(三)作业运行过程
深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
上一篇《深入理解Spark 2.0 (一):RDD实现及源码分析 》的5.2 Spark任务调度器我们省略过去了,这篇我们就来讲讲Spark的调度器。
小爷毛毛_卓寿杰
2019/02/13
9900
深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
[Spark源码剖析] DAGScheduler提交stage
DAGScheduler通过调用submitStage来提交stage,实现如下:
codingforfun
2018/08/24
2610
spark scheduler_scheduledthreadpool
在Spark中一个核心的是模块就是调度器(Scheduler),在spark中Scheduler有两种TaskScheduler(是低级的调度器接口),DagScheduler(是高级的调度)
全栈程序员站长
2022/11/07
3130
Spark的位置优先: TaskSetManager 的有效 Locality Levels
在Spark Application Web UI的 Stages tag 上,我们可以看到这个的表格,描述的是某个 stage 的 tasks 的一些信息,其中 Locality Level 一栏的值可以有 PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY 几个值。这篇文章将从这几个值入手,从源码角度分析 TaskSetManager 的 Locality Levels
codingforfun
2018/08/24
1.3K0
Spark的位置优先: TaskSetManager 的有效 Locality Levels
深入浅出Spark(三):Spark调度系统之“权力的游戏”
专题介绍:2009 年,Spark 诞生于加州大学伯克利分校的 AMP 实验室(the Algorithms, Machines and People lab),并于 2010 年开源。2013 年,Spark 捐献给阿帕奇软件基金会(Apache Software Foundation),并于 2014 年成为 Apache 顶级项目。如今,十年光景已过,Spark 成为了大大小小企业与研究机构的常用工具之一,依旧深受不少开发人员的喜爱。如果你是初入江湖且希望了解、学习 Spark 的“小虾米”,那么 InfoQ 与 FreeWheel 技术专家吴磊合作的专题系列文章——《深入浅出 Spark:原理详解与开发实践》一定适合你!本文系专题系列第三篇。
深度学习与Python
2020/12/07
3970
相关推荐
一文搞懂Spark的Task调度器(TaskScheduler)[通俗易懂]
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验