Spark应用程序提交时,根据RDD依赖关系形成有向无环图(DAG),然后交给DAGScheduler进行划分作业和调度阶段。这些作业之间可以没有依赖关系,对于多个作业之间的调度,Spark提供了两种调度策略:FIFO模式和FAIR模式;
所谓的调度策略就是对待调度的对象进行排序,按照优先级来进行调度。调度的排序接口如下所示,就是对两个可调度的对象进行比较;
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
}
FIFO模式
在默认情况下,Spark的调度器以FIFO(先进先出)方式调度Job的执行。其实现类为FIFOSchedulingAlgorithm;
/**
* FIFO排序的实现,主要因素是优先级、其次是对应的Stage
* 优先级高的在前面,优先级相同,则靠前的stage优先
*/
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
//一般来说优先级越小优先级越高
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
//如果优先级相同,那么Stage靠前的优先
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
true
} else {
false
}
}
}
在此调用策略中,由于根调度池rootPool直接包含了多个作业的任务管理器,在比较时,首先需要比较作业的优先级(根据作业编号判断,作业编号越小优先级越高),如果是同一个作业,会再比较调度阶段优先级(根据调度阶段编号判断,调度阶段编号越小优先级越高)。
每个Job被切分为多个Stage。第一个Job优先获取所有可用的资源,接下来第二个Job再获取剩余资源。以此类推,如果第一个Job并没有占用所有的资源,则第二个Job还可以继续获取剩余资源,这样多个Job可以并行运行。如果第一个Job很大,占用所有资源,则第二个Job就需要等待第一个任务执行完,释放空余资源,再申请和分配Job。如果是相同的Job不同的Stage,则优先执行较早的Stage。
FAIR模式:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
//最小共享,可以理解为执行需要的最小资源即CPU核数,其他相同时,所需最小核数小的优先调度
val minShare1 = s1.minShare
val minShare2 = s2.minShare
//运行的任务的数量
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
//是否有处于挨饿状态的任务,看可分配的核数是否少于任务数,如果资源不够用,那么处于挨饿状态
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
//最小资源占用比例,这里可以理解为偏向任务较轻的
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
//权重,任务数相同,权重高的优先
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare: Int = 0
//挨饿的优先
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
//都处于挨饿状态则,需要资源占用比小 的优先
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
//都不挨饿,则比较权重比,比例低的优先
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
//如果都一样,那么比较名字,按照字母顺序比较,不考虑长度,所以名字比较重要
s1.name < s2.name
}
}
}
在FAIR共享模式调度下,先获取两个调度的饥饿程度,饥饿程度为正在运行的任务是否小于最小任务,如果是,则表示调度处于饥饿程度。获取饥饿程度进行如下比较:
1. 如果某个调度处于饥饿状态,另外一个非饥饿状态,则先满足处于饥饿状态的调度;
2. 如果两个调度都处于饥饿状态,则比较资源比,先满足资源比小的调度;
3. 如果两个调度都处于非饥饿状态,则比较权重比,先满足权重比小的调度;
4. 以上情况均相同的情况,根据调度的名称排序;
Spark在多Job之间以轮询(round robin)方式为任务分配资源,所有的任务拥有大致相当的优先级来共享集群的资源。这就意味着当一个长任务正在执行时,短任务仍可以分配到资源,提交并执行,并且获得不错的响应时间。这样就不用像以前一样需要等待长任务执行完才可以。这种调度模式很适合多用户的场景。
具体源码可参考:https://yq.aliyun.com/articles/6041
加入技术讨论群
《大数据和云计算技术》社区群人数已经2500+,欢迎大家加下面助手微信,拉大家进群,自由交流。
喜欢钉钉的同学扫描下面二维码:
喜欢QQ群的,可以扫描下面二维码:
欢迎大家通过二维码打赏支持技术社区(英雄请留名,社区感谢您,打赏次数超过50+):
领取专属 10元无门槛券
私享最新 技术干货