前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >SparkSQL并行执行多个Job的探索

SparkSQL并行执行多个Job的探索

作者头像
Spark学习技巧
发布于 2022-03-14 12:18:09
发布于 2022-03-14 12:18:09
1.9K00
代码可运行
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧
运行总次数:0
代码可运行

现象

先来看个现象,下图中一个sql任务居然有多个job并行跑,为什么呢?

不错看到这里是不是有很多疑问,下面我就带着这些疑问,从以下几方面一一解答。

  1. 看看Spark的调度框架是否支持并行提交多个job(引用了些其他博主的内容)
  2. 讲解SparkSQLThriftServer入口,为后面SQL并行提交Job做铺垫
  3. 讲解在非自适应与自适应情况下SQL的并行提交Job的机制

1 并行提交多个job

1.1 是否支持并行提交多个任务

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
df.write.partitionBy("type", "interval").mode("append").parquet("s3://data")
  • 通过partitionBy功能让Spark自动做将数据写入不同的分区路径。
  • 对于一个Spark Job,我们总是期望能充分利用所有的cpu-vcore来并行执行,因此通常会将数据repartitioncpu-vcore的个数,即每个cpu-vcore上跑一个Task。而对于写文件的Job,每个Task会写入到自己的一个文件中,最终生成的文件数是由Task个数决定。在下图中,假设集群总共有12个cpu-vcore分配给Executor使用,那么就会有12个Task并行执行写入,最终生成12个文件。
  • 从充分利用资源的角度来看,这样的设计无疑是最佳的。但是,对于一些实时流处理任务或者周期性的离线任务而言,这样做会产生大量的小文件,会给后续的文件加载和快速查询带来困难。因此,从尽可能产生少量文件的角度出发,需要采用下图所示的写入方式,即在写入前,将数据分配到少量的Partition中,用少量的Task来执行。但是,这样做就会导致有部分cpu-vcore在写入过程中处于闲置状态,造成了资源浪费。
  • 显然,在这件事情上,“充分利用资源”和“产生少量文件”两个方向发生了冲突。那么,有没有一个两全之策呢?即既保证产生少量文件,又能把原本闲置的资源利用起来。如下图所示,假设我们能同时跑多个写入文件的Job,每个Job利用一部分cpu-vcore来执行,似乎就可以达到这个目的了。带着这样的思路,做一番调研与实践。
  • 上述思路可以总结为:通过一个SparkContex并行提交多个Job,由Spark自己来调度资源,实现并行执行。针对这个思路,首先要搞清楚Spark是否支持这么玩,如果支持的话又是怎么支持的。
  • 简单梳理下Spark的任务调度机制:
  1. SparkContextDAGScheduler提交一个Job后,会创建一个JobWaiter对象,用于阻塞当前线程,等待Job的执行结果。因此,在一个线程中,Job是顺序执行的。
  2. DAGScheduler会根据RDD的依赖关系将一个Job划分为若干个Stage(以Shuffle为界)。因为前后Stage存在数据上的依赖,所以只有父Stage执行完毕才能提交当前Stage。
  3. DAGScheduler在提交Stage时,会根据Partition信息生成相应的Task,打包成TaskSet,提交给TaskScheduler。而TaskScheduler收到后,会将TaskSet封装成TaskSetManager,丢到任务队列中等待执行。
  4. SchedulerBackend负责Executor状态与资源的管理,当发现有空闲资源时,就会通过TaskScheduler从任务队列中取出相应的TaskSetManager去调度执行。
  5. TaskSetManager中的Task最终会分发到Executor中的线程里去执行。

Spark是以TaskSetManager为单元来调度任务的。通常情况下,任务队列中只会有一个TaskSetManager,而通过多线程提交多个Job时,则会有多个TaskSetManager被丢到任务队列中。在有空闲资源的情况下,谁会从队列里被取出来执行就取决于相应的调度策略了。目前,Spark支持FIFO和FAIR两种调度策略。

基本可以明确以下两点:

  • Spark支持通过多线程在一个SparkContext上提交多个Job,每个线程里面的Job是顺序执行的,但是不同线程的Job是可以并行执行的,取决当时Executor中是否有充足的cpu-vcore。
  • 任务队列中的TaskSetManager是有序执行,还是轮询执行(可分配权重)取决于采用哪种调度策略。

可以用多线程方式并行提交Job,示例如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
var df = spark.read.json("person.json").repartition(55)
// df.cache()
// val c = df.count()
// println(s"${c}")

val jobExecutor = Executors.newFixedThreadPool(5)
for( _ <- Range(0, 5) ) {
  jobExecutor.execute(new Runnable {
    override def run(): Unit = {
      val id = UUID.randomUUID().toString()
      df.coalesce(11).write.json(s"hdfs://ns1/user/root/data/test/${id}")
    }
  })
}

1.2 Spark Thrift Server简单讲解

  • Thrift 是一种接口描述语言和二进制通信协议,由 Facebook 开发并贡献到 Apache 开源社区,用来定义和创建跨语言的服务 。Thrift包含的代码生成引擎可以应用于多种语言中,包括C ++、 JavaPython 等 。其数据传输采用二进制格式,相对常用的 XMLJSON 格式体积更小,在多语言、高并发和大数据场景下更具优势 。
  • Thrift 框架支持使用IDL (Interface Definition Language)定义服务接口,然后利用提供的编译器将服务接口编译成不同语言的实现代码,从而实现服务端和客户端跨语言的支持。SparkThriftServer 中定义的 Thrift的协议在 if 目录下的TCLIService. thrift文件中 。客户端与服务端工作的原理如下图所示,协议层( Protocol)、传输层(Transport)乃至底层 IO传输的具体实现都不需要用户关心 。

Spark 中启动ThriftServer 的主要流程 :

整个服务的生命周期从执行。sbin 文件夹下的start-thriftserver.sh脚本开始直到执行stop-thriftserver脚本结束。最终调用sparkSubmit接口提交org.apache.spark.sql.hive.thriftserver.HiveThriftServer2应用。

  • ThriftCLIService作为服务端负责维护与客户端的连接并将客户端的请求转发至 SparkSQLCLIService, SparkSQLCLIService通过调用后端Hive或Spark系统完成运算并把执行结果返回给ThriftCLIService, 最终ThriftCLIService把结果返回给客户端 。
  • ThriftCLIServiceThriftHttpCLIServiceThriftBinaryCLIService两种形式,分别对应 Http输模式和 Binary 传输模式,通过配置参数( hive.server2. transport.mode)进行判断,默认为 Binary模式。
  • SparkSQLSessionManager 对象,用于 Session 的管理 。而 SparkSessionManagerSessionManager 的子类,构造参数也比SessionManager多了一个 SQLContext,其内部包含一个SparkSQLOperationManager对象,用于 Operation 的管理。
  • SparkSQLCLIServiceSparkSQLSessionManagerSparkSQOperationManager三者 之间的关系类似基本的CLIServiceSessionManagerOperationManager 之间的关系 。
  • SparkOperationManager创建的是SparkExecuteStatementOperation , 查询发送给 SparkSQL 完成 。
  • SparkExecuteStatementOperation 是 Spark SQL 执行 SQL 语句的最终实现,其内部声明了4 个比较重要 的对象 :执行 SQL 语句 生成的 result (DataFrame 类型)、结果集的迭代器 iter(Iterator [SparkRow] 类型)、结果集头部迭代器iterHeader (Iterator[SparkRow]类型)和数据类型dataTypes ( Array[DataType] 类型 ) 。作为 Operation 的子类,外部调用的接口是 runInternal 方法,但其核心逻辑在execute方法中实现 。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def execute() : Unit = {
****************************************
result = sqlContext.sql (statement) // 构造逻辑计划阶段和物理计划阶段, 最终得到 的是 DataFrame 数据类型
****************************************
iter = {
val useincrementalCollect = sqlContext.getConf(
" spark.sql.thriftServer.incrementalCollect""false").toBoolean
if (useincrementalCollect) {
result.toLocaliterator.asScala
} else {
****************************************
result.collect().iterator       // 启动runJob
****************************************
val (itra, itrb) = iter. duplicate
IterHead = itra
iter = itrb
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
…………………………………………………………………………………………………………………………………………
}

1.3 SparkSQL中如何并行Job

举个例子TPC-DS中标准SQL第一个sql为例子来说明并行Job:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
with customer_total_return as
(select sr_customer_sk as ctr_customer_sk
,sr_store_sk as ctr_store_sk
,sum(SR_FEE) as ctr_total_return
from store_returns
,date_dim
where sr_returned_date_sk = d_date_sk
and d_year =2001
group by sr_customer_sk
,sr_store_sk)
 select  c_customer_id
from customer_total_return ctr1
,store
,customer
where ctr1.ctr_total_return > (select avg(ctr_total_return)*1.2
from customer_total_return ctr2
where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
and s_store_sk = ctr1.ctr_store_sk
and s_state = 'MI'
and ctr1.ctr_customer_sk = c_customer_sk
order by c_customer_id
limit 100;

在开启与关闭自适应情况下来比对对比生成的并行Job数:

上图中看到明显开启spark.sql.adaptor.enabled=true情况下生成的并行Job更多,下面我们分析一下两种情况的执行计划。

关闭自适应情况下执行计划如下,根节点为TakeOrderAndProject,如下图所示(由于DAG图比较庞大,只截取了一部分):

开启自适应情况下,根节点为AdaptiveSparkPlan,他的子节点才为TakeOrderAndProject,如下图所示(DAG部分截图)。

1.3.1 主Job如何生成

有上一章节中已经指定SQL的提交过程,并且SparkExecuteStatementOperation#execute主方法中执行了sqlContext.sql()进行了构造逻辑计划阶段和物理计划阶段, 最终得到 的是 DataFrame 数据类型。调用result.collect()真正启动了一个job。流程如下图所示:

从上图中可以看到主Job是由HiveThriftServer2驱动的DataSet.collect来触发的,上面例子用跟节点为TakeOrderAndProjectExec来走的流程,实际后期调用的还是RDD#takeOrdered来触发的。

1.3.2 子Job如何生成

SparkPlan是一颗庞大的树,上一章节中提到DataSet#collectFormPlan调用到SparkPlan#executeCollect此方法可以是其他类型的跟节点,目前继承的有下图这些,当开启自适应则调用的是AdaptiveSparkPlanExec#executeCollect方法:

其中自适应查询包adaptive的QueryStageExec有两个继承类BroadcastQueryStageExec与ShuffleQueryStageExec。

子Job并行启动的所有流程,如下图所示:

  • 当不开启自适应时,入口是通过TakeOrderAndProject#child#execute来构造任务想BroadcastExchangeExec中线程池提交child#executeCollectIterator任务来触发collect操作从而启动了子Job。
  • 当开启自适应时,入口是AdaptiveSparkPlan#executeCollect,中间也会走不开启自适应的路启动一批广播的子Job,在调用AdaptiveSparkPlan#getFinalPhysicalPlan时,会调用子类doMaterialize方法在子类中会启动BroadcastStageTimeout线程,重要的是submetMapStage线程来向DAGScheduler提交MapStageSubmitted任务来触发另一批子Job启动。

以上就是对SparkSQL并行执行多个Job的所有探索,与一个Job转成DAG从而划分层多个Stage不是同层次的原理,希望能帮助到大家!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-03-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【Spark】Spark之what
Spark:通用大数据快速处理引擎。可以基于Hadoop上存储的大数据(HDFS、Hive、HBase等任何实现了Hadoop接口的存储系统)进行计算。
章鱼carl
2022/03/31
9050
【Spark】Spark之what
Spark Scheduler 内部原理剖析
涂小刚
2017/04/26
3.9K2
Spark Scheduler 内部原理剖析
SparkSQL基本使用
往Hadoop集群上上传测试数据,hdfs dfs -cat /person/employee.txt employee.txt 1,zhangxx,20,manager 2,wangxin,25,employee 3,wangergou,78,xixi 4,wawo,35,gogo 5,liwei,28,programmer 6,hanmeimei,29,UI 1.读取数据,将每一行的数据使用列分隔符分割 val lineRDD = sc.textFile("hdfs://hdp-sk-01:900
sparkle123
2018/04/26
1.1K0
Spark系列 - (4) Spark任务调度
Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作。Driver在Spark作业时主要负责:
码老思
2023/10/19
7111
Spark系列 - (4) Spark任务调度
SparkSql中多个Stage的并发执行
写一篇水水的技术文,总结一下sparksql中不同stage的并行执行相关,也是来自于一位群友的提问:
数据仓库践行者
2022/11/25
1.6K0
SparkSql中多个Stage的并发执行
加米谷学院:Spark核心技术原理透视一(Spark运行原理)
在大数据领域,只有深挖数据科学领域,走在学术前沿,才能在底层算法和模型方面走在前面,从而占据领先地位。
加米谷大数据
2018/03/20
2K4
加米谷学院:Spark核心技术原理透视一(Spark运行原理)
Spark底层原理详细解析(深度好文,建议收藏)
Apache Spark是用于大规模数据处理的统一分析引擎,基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量硬件之上,形成集群。
五分钟学大数据
2021/01/29
9880
Spark底层原理详细解析(深度好文,建议收藏)
Spark内核详解 (5) | Spark的任务调度机制
在上一篇博文中我们讲解了 Spark YARN-Cluster 模式下的任务提交流程,但是我们并没有具体说明 Driver 的工作流程, Driver 线程主要是初始化 SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。
不温卜火
2020/10/28
3.6K0
Spark内核详解 (5) | Spark的任务调度机制
Spark源码系列(四)图解作业生命周期
这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know! 我们先回顾一下这个图,Driver Program是我们写的那个程序,它的核心是Spar
岑玉海
2018/02/28
8950
Spark源码系列(四)图解作业生命周期
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。
Maynor
2022/02/17
8580
Spark Day06:Spark Core之Spark 内核调度和SparkSQL快速入门
Spark on Yarn年度知识整理
Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不仅实现了MapReduce的算子map 函数和reduce函数及计算模型,还提供更为丰富的算子,如filter、join、groupByKey等。是一个用来实现快速而同用的集群计算的平台。
用户3003813
2018/09/06
1.3K0
Spark on Yarn年度知识整理
Spark内部原理之运行原理
在大数据领域,只有深挖数据科学领域,走在学术前沿,才能在底层算法和模型方面走在前面,从而占据领先地位。
smartsi
2019/08/08
1.1K0
Spark任务调度 | Spark,从入门到精通
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)
美图数据技术团队
2018/09/18
1.4K0
Spark任务调度 | Spark,从入门到精通
spark应用程序的运行架构
(1)job:包含多个task组成的并行计算,往往由action催生。 (2)stage:job的调度单位。 (3)task:被送到某个executor上的工作单元。 (4)taskSet:一组关联的,相互之间没有shuffle依赖关系的任务组成的任务集。
用户1148526
2019/05/25
9680
3.2 Spark调度机制
3.2 Spark调度机制 Spark调度机制是保证Spark应用高效执行的关键。本节从Application、job、stage和task的维度,从上层到底层来一步一步揭示Spark的调度策略。 3.2.1 Application的调度 Spark中,每个Application对应一个SparkContext。SparkContext之间的调度关系取决于Spark的运行模式。对Standalone模式而言,Spark Master节点先计算集群内的计算资源能否满足等待队列中的应用对内存和CPU资源的需求,
Albert陈凯
2018/04/04
1.1K0
Spark知识体系完整解读
Spark简介 Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不仅实现了MapReduce的算子map 函数和reduce函数及计算模型,还提供更为丰富的算子,如filter、join、groupByKey等。是一个用来实现快速而同用的集群计算的平台。 Spark将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。其底层采用Scala这种函数式语言书写而成,并且所提供的API深度借鉴Scala函数式的编程思想,提供与Scala类似的编程接口 Sparkon Yarn
IT阅读排行榜
2018/08/14
1K0
Spark知识体系完整解读
3.1 Spark应用执行机制分析
3.1 Spark应用执行机制分析 下面对Spark Application的基本概念和执行机制进行深入介绍。 3.1.1 Spark应用的基本概念 Spark应用(Application)是用户提交的应用程序。Spark运行模式分为:Local、Standalone、YARN、Mesos等。根据Spark Application的Driver Program是否在集群中运行,Spark应用的运行方式又可以分为Cluster模式和Client模式。 下面介绍Spark应用涉及的一些基本概念: 1)Spark
Albert陈凯
2018/04/04
6790
[spark] Task成功执行的结果处理
在文章Task执行流程 中介绍了task是怎么被分配到executor上执行的,本文讲解task成功执行时将结果返回给driver的处理流程。
UFO
2018/08/29
1.5K0
TaskScheduler_taskset -p
DAGScheduler面向我们整个Job划分出了Stage,划分了Stage是从后往前划分的,执行的时候是从前往后,每个Stage内部有一系列任务,Stage里面的任务是并行计算的,这些并行计算的任务的逻辑是完全相同的,只不过是处理的数据不同而已。DAGScheduler会以TaskSet的方式以一个DAG构造的Stage中所有的任务提交给底层调度器TaskScheduler,TaskScheduler是一个接口(做接口的好处就是跟具体的任务调度解耦合,这样Spark就可以运行在不同的资源调度模式上Standalone,yarn,mesos等)这符合面向对象中依赖抽象而不依赖具体的原则,带来了底层资源调度器的可插拔性,导致Spark可以运行在众多的资源调度器模式上。
全栈程序员站长
2022/11/10
3620
TaskScheduler_taskset -p
Spark Core 整体介绍
–num-executors: 执行器个数,执行器数可以为节点个数,也可以为总核数(单节点核数*节点数),也可以是介于俩者之间(用于调优) –executor-cores: 执行器核数, 核数可以1,也可以为单节点的内核书,也可以是介于俩者之间(用于调优) –executor-memory: 执行器内存, 可以为最小内存数(单节点内存总数/单节点核数),也可以为最大内存数(单节点内存总数),也可以是介于俩者之间(用于调优)
Freedom123
2024/03/29
5420
Spark Core 整体介绍
相关推荐
【Spark】Spark之what
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档