首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >spark源码系列之内部通讯的三种机制

spark源码系列之内部通讯的三种机制

作者头像
Spark学习技巧
发布于 2018-01-30 10:28:22
发布于 2018-01-30 10:28:22
1.2K0
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

本文是以spark1.6.0的源码为例讲解。

Spark为协调各个组件完成任务及内部任务处理采用了多种方式进行了各个组件之间的通讯。总共三个部分牵涉的功能是:

1,DAG相关的DAGSchedulerEventProcessLoop。

2,sparkUI相关的SparkListener

3,RPC相关netty RPC流程。本文只讲流程,后面会详细介绍。

一,单个部件自己消息处理方式

DAGSchedulerEventProcessLoop该类继承自EventLoop。是一个典型的生产消费模型。

A),生产者

通过调用

DAGSchedulerEventProcessLoop.post(event: E)

来将消息进行发布。

B),消费者

Eventloop内部维护了一个线程,循环的消费消息eventQueue.take(),调用onReceive(event)进行处理。DAGSchedulerEventProcessLoop内部实现了doOnReceive,对事件进行模式匹配然后交给具体的消息处理函数。

代码语言:js
AI代码解释
复制
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

 case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

 case StageCancelled(stageId) =>
    dagScheduler.handleStageCancellation(stageId)

 case JobCancelled(jobId) =>
    dagScheduler.handleJobCancellation(jobId)

 case JobGroupCancelled(groupId) =>
    dagScheduler.handleJobGroupCancelled(groupId)

 case AllJobsCancelled =>
    dagScheduler.doCancelAllJobs()

 case ExecutorAdded(execId, host) =>
    dagScheduler.handleExecutorAdded(execId, host)

 case ExecutorLost(execId) =>
    dagScheduler.handleExecutorLost(execId, fetchFailed = false)

 case BeginEvent(task, taskInfo) =>
    dagScheduler.handleBeginEvent(task, taskInfo)

 case GettingResultEvent(taskInfo) =>
    dagScheduler.handleGetTaskResult(taskInfo)

 case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
    dagScheduler.handleTaskCompletion(completion)

 case TaskSetFailed(taskSet, reason, exception) =>
    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

 case ResubmitFailedStages =>
    dagScheduler.resubmitFailedStages()
}

C),消息缓存

消息最终是存储于EventLoop的new LinkedBlockingDeque[E]()里。

二,SparkListeners和ListenerBus

SparkUI的各个监控指标都是,由ListenerBus最为生产者将消息,推送到消息缓存出默认支持1万,然后推送给各个Listener进行处理,然后我们的Spark的webUIPage去获取各个Listener的数据,进行展示。

A),生产者

LiveListenerBus/StreamingListenerBus调用其父类AsynchronousListenerBus的post方法将消息加入 new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY),容量1万。

val eventAdded = eventQueue.offer(event)

B),消费者

AsynchronousListenerBus内部维护了一个消费者线程,线程内部有while(true)进行消息处理。

val event = eventQueue.poll

postToAll(event)

C),消息的具体处理

ListenerBus的postToAll方法,会遍历所有注册了的Listener。

代码语言:js
AI代码解释
复制
final def postToAll(event: E): Unit = {

 val iter = listeners.iterator
 while (iter.hasNext) {
 val listener = iter.next()
 try {
 onPostEvent(listener, event)
 } catch {
 case NonFatal(e) =>
        logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
    }
  }
}

最终在onPostEvent方法中将消息进行了处理。onPostEvent在源码中的两个重要现:

SparkListenerBus和StreamingListenerBus内部的onPostEvent。

代码语言:js
AI代码解释
复制
private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {

 override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
    event match {
 case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
 case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
 case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
 case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
 case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
 case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)
 case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)
 case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)
 case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)
 case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)
 case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)
 case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)
 case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)
 case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)
 case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)
 case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)
 case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)
 case logStart: SparkListenerLogStart => // ignore event log metadata
 }
  }

}
代码语言:js
AI代码解释
复制
private[spark] class StreamingListenerBus
 extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
 with Logging {

 private val logDroppedEvent = new AtomicBoolean(false)

 override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
    event match {
 case receiverStarted: StreamingListenerReceiverStarted =>
        listener.onReceiverStarted(receiverStarted)
 case receiverError: StreamingListenerReceiverError =>
        listener.onReceiverError(receiverError)
 case receiverStopped: StreamingListenerReceiverStopped =>
        listener.onReceiverStopped(receiverStopped)
 case batchSubmitted: StreamingListenerBatchSubmitted =>
        listener.onBatchSubmitted(batchSubmitted)
 case batchStarted: StreamingListenerBatchStarted =>
        listener.onBatchStarted(batchStarted)
 case batchCompleted: StreamingListenerBatchCompleted =>
        listener.onBatchCompleted(batchCompleted)
 case outputOperationStarted: StreamingListenerOutputOperationStarted =>
        listener.onOutputOperationStarted(outputOperationStarted)
 case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
        listener.onOutputOperationCompleted(outputOperationCompleted)
 case _ =>
    }
  }

 override def onDropEvent(event: StreamingListenerEvent): Unit = {
 if (logDroppedEvent.compareAndSet(false, true)) {
 // Only log the following message once to avoid duplicated annoying logs.
 logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
 "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
 "rate at which events are being started by the scheduler.")
    }
  }
}

D),消息的缓存

消息是缓存在AsynchronousListenerBus

val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)

EVENT_QUEUE_CAPACITY=10000

三,Spark多进程之间的通讯RPC

Spark的内部rpc老版本是用akka实现的,spark1.6以后虽然保留akka,但是默认实现已经是netty。

其实,rpc采用netty之前,rpc是通过akka,而文件传输是通过netty。现在相当于全部采用了netty的实现的。

四,总结

本篇文章主要是将内部spark的内部事件通知的机制。希望通过这篇文章,大家对spark内部事件通知流程有所了解。

这三种模型是我们现在编程最常见的三种模型,希望能对大家编写自己的代码提供一些有益的思路。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2017-07-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
Spark2.4.0源码分析之WorldCount 事件循环处理器(三) 更多资源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 时序图 https://github.com/opensourceteams/spark-scala-maven-2.4.0/blob/master/md/image/example/spark-sql-dataset/worldCount/DAGSchedulerEventProcessLo
thinktothings
2019/01/17
4710
Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
Spark Core源码精读计划5 | 事件总线及ListenerBus
在讲解SparkContext组件初始化时,第一个初始化的内部组件就是LiveListenerBus,后面的组件很多都会依赖它,这从侧面说明事件总线是非常重要的支撑组件。在对SparkContext有了大致的了解之后,我们选择事件总线作为探索Spark底层的起点。
大数据真好玩
2019/08/08
1K0
Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
thinktothings
2019/01/17
7160
Spark2.4.0源码分析之WorldCount Stage划分(DAGScheduler)(四)
深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析
在上一篇博文《深入理解Spark 2.1 Core (三):任务调度器的实现与源码分析 》TaskScheduler在发送任务给executor前的工作就全部完成了。这篇博文,我们来看看当executor计算完任务后,Spark是如何处理获取的计算结果与容错的。
小爷毛毛_卓寿杰
2019/02/13
1K0
spark源码分析————DAGScheduler实现
DAGScheduler创建、Job提交、Stage划分、任务生成
俺也想起舞
2019/07/24
5660
《深入理解Spark-核心思想与源码分析》读书笔记(1)
本文介绍了Spark编程模型的基本概念、原理和主要组件,包括SparkContext、SparkSession、SparkDAGScheduler、SparkTaskScheduler、SparkShuffleManager、SparkMemoryManager、SparkEnvironment、SparkIdeaPlugin、SparkScheduler、SparkApplication、SparkContext等。同时,本文还介绍了Spark在大数据处理、实时数据处理、机器学习、图计算等方面的应用。此外,本文还介绍了Spark的生态系统,包括SparkR、Spark Streaming、Spark Structured Streaming、Spark GraphX、Spark MLlib、Spark UI等。
用户1148523
2018/01/09
1.1K0
《深入理解Spark-核心思想与源码分析》读书笔记(1)
Spark Stage切分 源码剖析——DAGScheduler
Spark中的任务管理是很重要的内容,可以说想要理解Spark的计算流程,就必须对它的任务的切分有一定的了解。不然你就看不懂Spark UI,看不懂Spark UI就无法去做优化...因此本篇就从源码的角度说说其中的一部分,Stage的切分——DAG图的创建 先说说概念 在Spark中有几个维度的概念: 应用Application,你的代码就是一个应用 Job,Job是以action为边界的。 Stage,是按照宽窄依赖来界定的 Task,最终落实到各个工作节点上的任务,是真正意义上的任务 光说上面的
用户1154259
2018/01/17
1.4K0
Spark Stage切分 源码剖析——DAGScheduler
[spark] DAGScheduler划分stage源码解析
Spark Application只有遇到action操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个RDD之间的依赖关系形成一个DAG,并根据ShuffleDependency来进行stage的划分,stage包含多个tasks,个数由该stage的finalRDD决定,stage里面的task完全相同,DAGScheduler 完成stage的划分后基于每个Stage生成TaskSet,并提交给TaskScheduler,TaskScheduler负责具体的task的调度,在Worker节点上启动task。
UFO
2018/09/04
8980
[spark] DAGScheduler划分stage源码解析
【源码解读】| LiveListenerBus源码解读
异步事件列队主要由LinkedBlockingQueue[SparkListenerEvent] 构建,默认大小为10000
857技术社区
2022/07/07
1.6K0
【源码解读】| LiveListenerBus源码解读
[Spark源码剖析] JobWaiter
来创建容纳job结果的数据,数组的每个元素对应与之下标相等的partition的计算结果;并将结果处理函数(index, res) => results(index) = res作为参数传入runJob,以使在runJob内部的创建的JobWaiter对象能够在得知taskSucceeded之后,将该task的结果填充到results中
codingforfun
2018/08/24
4240
Spark Core源码精读计划6 | AsyncEventQueue与LiveListenerBus
在上一篇文章中,我们了解了Spark事件总线机制的概况,以及ListenerBus、SparkListenerBus的细节。
大数据真好玩
2019/08/08
1.2K0
深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
上一篇《深入理解Spark 2.0 (一):RDD实现及源码分析 》的5.2 Spark任务调度器我们省略过去了,这篇我们就来讲讲Spark的调度器。
小爷毛毛_卓寿杰
2019/02/13
1K0
深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
Spark DAG调度
SparkContext在初始化时,创建了DAG调度与Task调度来负责RDD Action操作的调度执行。 DAGScheduler DAGScheduler负责Spark的最高级别的任务调度,调度的粒度是Stage,它为每个Job的所有Stage计算一个有向无环图,控制它们的并发,并找到一个最佳路径来执行它们。具体的执行过程是将Stage下的Task集提交给TaskScheduler对象,由它来提交到集群上去申请资源并最终完成执行。 DAGScheduler的定义位于scheduler/DAGSched
天策
2018/06/22
8800
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五) 更多资源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 时序图 https://github.com/opensourceteams/spark-scala-maven-2.4.0/blob/master/md/image/example/spark-sql-dataset/worldCount/worldCount
thinktothings
2019/01/17
1.1K0
Spark2.4.0源码分析之WorldCount Stage提交顺序(DAGScheduler)(五)
Spark源码分析 之 Driver和Excutor是怎么跑起来的?(2.2.0版本)
今天抽空回顾了一下Spark相关的源码,本来想要了解一下Block的管理机制,但是看着看着就回到了SparkContext的创建与使用。正好之前没有正式的整理过这部分的内容,这次就顺带着回顾一下。 更多内容参考:我的大数据之路 Spark作为目前最流行的大数据计算框架,已经发展了几个年头了。版本也从我刚接触的1.6升级到了2.2.1。由于目前工作使用的是2.2.0,所以这次的分析也就从2.2.0版本入手了。 涉及的内容主要有: Standalone模式中的Master与Worker client、dr
用户1154259
2018/01/17
7260
Spark源码分析 之 Driver和Excutor是怎么跑起来的?(2.2.0版本)
Spark中的Scheduler「建议收藏」
scheduler分成两个类型。一个是TaskScheduler与事实上现,一个是DAGScheduler。
全栈程序员站长
2022/07/07
7450
Spark源码系列(三)作业运行过程
作业执行 上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥? 官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collect方法。 def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*)
岑玉海
2018/02/28
1.2K0
Spark源码系列(三)作业运行过程
Spark ListenerBus 和 MetricsSystem 体系分析
监控是一个大系统完成后最重要的一部分。Spark整个系统运行情况是由ListenerBus以及MetricsSystem 来完成的。这篇文章重点分析他们之间的工作机制以及如何通过这两个系统完成更多的指标收集。
用户2936994
2018/08/27
7510
Spark Core源码精读计划15 | 心跳接收器HeartbeatReceiver
按照SparkContext初始化的顺序,下一个应该是心跳接收器HeartbeatReceiver。由于笔者感染乙流仍然没有痊愈,状态不好,文中若有疏漏,请批评指正。
大数据真好玩
2019/08/19
1.2K0
Spark Core源码精读计划3 | SparkContext辅助属性及后初始化
在文章#2中,我们了解了SparkContext的主体部分,即组件初始化。除了它之外,SparkContext中还有一些与其内部机制紧密相关的属性,下文为了简单,就将它们称为“辅助属性”。另外,在组件初始化完成后,还有一些善后工作,即后初始化(Post-init)。本文就来研究这两块内容。
王知无-import_bigdata
2019/08/02
8130
Spark Core源码精读计划3 | SparkContext辅助属性及后初始化
相关推荐
Spark2.4.0源码分析之WorldCount 事件循环处理器(三)
更多 >
交个朋友
加入HAI高性能应用服务器交流群
探索HAI应用新境界 共享实践心得
加入腾讯云技术交流站
洞悉AI新动向 Get大咖技术交流群
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档