本文是以spark1.6.0的源码为例讲解。
Spark为协调各个组件完成任务及内部任务处理采用了多种方式进行了各个组件之间的通讯。总共三个部分牵涉的功能是:
1,DAG相关的DAGSchedulerEventProcessLoop。
2,sparkUI相关的SparkListener
3,RPC相关netty RPC流程。本文只讲流程,后面会详细介绍。
一,单个部件自己消息处理方式
DAGSchedulerEventProcessLoop该类继承自EventLoop。是一个典型的生产消费模型。
A),生产者
通过调用
DAGSchedulerEventProcessLoop.post(event: E)
来将消息进行发布。
B),消费者
Eventloop内部维护了一个线程,循环的消费消息eventQueue.take(),调用onReceive(event)进行处理。DAGSchedulerEventProcessLoop内部实现了doOnReceive,对事件进行模式匹配然后交给具体的消息处理函数。
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
C),消息缓存
消息最终是存储于EventLoop的new LinkedBlockingDeque[E]()里。
二,SparkListeners和ListenerBus
SparkUI的各个监控指标都是,由ListenerBus最为生产者将消息,推送到消息缓存出默认支持1万,然后推送给各个Listener进行处理,然后我们的Spark的webUIPage去获取各个Listener的数据,进行展示。
A),生产者
LiveListenerBus/StreamingListenerBus调用其父类AsynchronousListenerBus的post方法将消息加入 new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY),容量1万。
val eventAdded = eventQueue.offer(event)
B),消费者
AsynchronousListenerBus内部维护了一个消费者线程,线程内部有while(true)进行消息处理。
val event = eventQueue.poll
postToAll(event)
C),消息的具体处理
ListenerBus的postToAll方法,会遍历所有注册了的Listener。
final def postToAll(event: E): Unit = {
val iter = listeners.iterator
while (iter.hasNext) {
val listener = iter.next()
try {
onPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}
最终在onPostEvent方法中将消息进行了处理。onPostEvent在源码中的两个重要现:
SparkListenerBus和StreamingListenerBus内部的onPostEvent。
private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
}
}
}
private[spark] class StreamingListenerBus
extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
with Logging {
private val logDroppedEvent = new AtomicBoolean(false)
override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listener.onReceiverStarted(receiverStarted)
case receiverError: StreamingListenerReceiverError =>
listener.onReceiverError(receiverError)
case receiverStopped: StreamingListenerReceiverStopped =>
listener.onReceiverStopped(receiverStopped)
case batchSubmitted: StreamingListenerBatchSubmitted =>
listener.onBatchSubmitted(batchSubmitted)
case batchStarted: StreamingListenerBatchStarted =>
listener.onBatchStarted(batchStarted)
case batchCompleted: StreamingListenerBatchCompleted =>
listener.onBatchCompleted(batchCompleted)
case outputOperationStarted: StreamingListenerOutputOperationStarted =>
listener.onOutputOperationStarted(outputOperationStarted)
case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
listener.onOutputOperationCompleted(outputOperationCompleted)
case _ =>
}
}
override def onDropEvent(event: StreamingListenerEvent): Unit = {
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
"This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
"rate at which events are being started by the scheduler.")
}
}
}
D),消息的缓存
消息是缓存在AsynchronousListenerBus
val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
EVENT_QUEUE_CAPACITY=10000
三,Spark多进程之间的通讯RPC
Spark的内部rpc老版本是用akka实现的,spark1.6以后虽然保留akka,但是默认实现已经是netty。
其实,rpc采用netty之前,rpc是通过akka,而文件传输是通过netty。现在相当于全部采用了netty的实现的。
四,总结
本篇文章主要是将内部spark的内部事件通知的机制。希望通过这篇文章,大家对spark内部事件通知流程有所了解。
这三种模型是我们现在编程最常见的三种模型,希望能对大家编写自己的代码提供一些有益的思路。