1. 问题背景
2. 原理分析
2.1 Executor生命周期
2.2 ExecutorAllocationManager上下游调用关系
3. 总结与反思
4. Community Feedback
用户提交Spark应用到Yarn上时,可以通过spark-submit的num-executors参数显示地指定executor个数,随后,ApplicationMaster会为这些executor申请资源,每个executor作为一个Container在Yarn上运行。Spark调度器会把Task按照合适的策略分配到executor上执行。所有任务执行完后,executor被杀死,应用结束。在job运行的过程中,无论executor是否领取到任务,都会一直占有着资源不释放。很显然,这在任务量小且显示指定大量executor的情况下会很容易造成资源浪费。
在探究Spark如何实现之前,首先思考下如果自己来解决这个问题,需要考虑哪些因素?大致的方案很容易想到:如果executor在一段时间内一直处于空闲状态,那么就可以kill该executor,释放其占用的资源。当然,一些细节及边界条件需要考虑到:
首先,先简单分析下Spark静态资源分配中Executor的生命周期,以spark-shell中的wordcount为例,执行命令如下:
# 以yarn模式执行,并指定executor个数为1
$ spark-shell --master=yarn --num-executors=1
# 提交Job1 wordcount
scala> sc.textFile("file:///etc/hosts").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();
# 提交Job2 wordcount
scala> sc.textFile("file:///etc/profile").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).count();
# Ctrl+C Kill JVM
上述的Spark应用中,以yarn模式启动spark-shell,并顺序执行两次wordcount,最后Ctrl+C退出spark-shell。此例中Executor的生命周期如下图:
static-allocation
从上图可以看出,Executor在整个应用执行过程中,其状态一直处于Busy(执行Task)或Idle(空等)。处于Idle状态的Executor造成资源浪费这个问题已经在上面提到。下面重点看下开启Spark动态资源分配功能后,Executor如何运作。
spark_dynamic_allocation_executor_lifecycle
下面分析下上图中各个步骤:
上述流程中需要重点关注的几个问题:
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutorForFullParallelism)
.toInt
}
要实现资源的动态调整,那么限定调整范围是最先考虑的事情,Spark通过下面几个参数实现:
三者的关系必须满足:minExecutors <= initialExecutors <= maxExecutors
注意:如果显示指定了num-executors参数,那么initialExecutors就是num-executor指定的值。
如果Executor中缓存了数据,那么该Executor的Idle-timeout时间就不是由executorIdleTimeout决定,而是用spark.dynamicAllocation.cachedExecutorIdleTimeout控制,默认值:Integer.MAX_VALUE。如果手动设置了该值,当这些缓存数据的Executor被kill后,我们可以通过NodeManannger的External Shuffle Server来访问这些数据。这就要求NodeManager中spark.shuffle.service.enabled必须开启。
Spark动态分配的主要逻辑由ExecutorAllocationManager类实现,首先分析下与其交互的上下游关系,如下图所示:
spark_dynamic_allocation
主要的逻辑很简单:ExecutorAllocationManager中启动一个周期性任务,监控当前Executor是否超时,如果超时就将其移除。当然Executor状态的收集主要依赖于Spark提供的SparkListener机制。周期性任务逻辑如下:
private[spark] class ExecutorAllocationManager {
// Executor that handles the scheduling task.
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
def start(): Unit = {
。。。
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {...}
}
}
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
。。。
}
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
// 同步当前所需要的Executor数
updateAndSyncNumExecutorsTarget(now)
val executorIdsToBeRemoved = ArrayBuffer[String]()
// removeTimes是<executorId, expireTime>的映射。
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
executorIdsToBeRemoved += executorId
}
!expired
}
// 移除所有超时的Executor
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
}
}
以上就是对于Spark的动态资源分配的原理分析,相关源码可以参考Apache Spark:ExecutorAllocationManager。完整的配置参数见:Spark Configuration: Dynamic Allocation。