前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >尝尝鲜|Spark 3.1自适应执行计划

尝尝鲜|Spark 3.1自适应执行计划

作者头像
Spark学习技巧
发布于 2021-03-05 07:00:52
发布于 2021-03-05 07:00:52
90100
代码可运行
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧
运行总次数:0
代码可运行

每个框架产生都是为了解决一类问题,每个模块的优化也是为了解决一定的场景下的性能瓶颈。浪尖今天分享的关于Spark 3.1之后的自适应执行计划,主要针对以下几个场景,并且有百度率先研发的,不过社区之前一直没有采纳,spark 3.0的预发布版本参数也是不全,到了Spark 3.1的beta版已经可用,浪尖已经完成了测试。

1.解决场景

Spark Adative好几年前就开始提了,现在网络上流行的spark自适应执行计划也坑了不少人,因为spark官方并没有将自适应执行计划合并到主分支。目前来看,自适应正式引入应该是在spark 3.0的预发布版本,但是这个spark 3.0的两个预发布版本浪尖亲测自适应执行计划问题比较多,而且参数不全。可用的版本是2020.5月的master分支,编译之后的3.1.0-SNAPSHOT版本。

Spark 原有的DAG策略是静态生成的,一旦代码编译好,DAG策略就不会变了。Spark Adative自适应查询计划是动态的根据exchange划分查询stage,并且根据前面stage物化的统计数据优化后续查询stage的执行策略,进而提示性能。

算是增加了代价计算和查询stage根据计算的cost动态生成的策略。

应用场景,可以从百度提交的spark 自适应执行引擎,给出的测试案例总结以下三点:

1. sortMergeJoin转化为BroadcastHashJoin

该策略在BI场景下比较实用,因为一条查询中出现的join比较多,而且往往会有各种子查询和filter操作。将SortMergeJoin转化为BroadcastHashJoin,据测试说可以提升50%-200%的性能。

下图就是一个将SortMergeJoin转化为BroadCastJoin优化作用场景,经过一轮sortMergejoin之后,再进行join时,一侧的数据只有46.9KB,所以这种场景下使用自适应查询计划比较划算,将小表转化为broadcast,然后在executor进行本地的hashjoin<Spark join种类(>3种)及join选择依据>。

2. Spark长任务或者将Spark以服务的方式运行

长任务定义是任务执行超过一个小时。以spark作为服务,企业中的应用场景也比较多,比如使用spark-shell或者spark-sql客户端,使用thriftserver,或者使用zeppelin,livy或者百度的BigSql服务。

这些场景下,所有的spark job都共享相同的分区数,但是task有大有小,会使得小任务产生很多小文件,假设开启了spark 自适应执行计划及配置一些task信息,比如数据大小,行数,最大或者最小分区数等,可以带来50%-100%的性能提升。

3. GraphFrame任务

最后一种情况是应用程序使用GraphFrame,在这种情况下,假设用户拥有一个二维图,具有10亿条边,在GraphFrame中使用求连通性的算法。启用spark 自适应执行计划后,应用程序的持续时间从58分钟减少到32分钟,将性能提高了近100%。

2.Spark 3.1 版本实现逻辑

自适应查询计划在spark 源码中的类是AdaptiveSparkPlanExec。

AdaptiveSparkPlanExec是自适应执行查询计划的根节点。AdaptiveSparkPlanExec会按照Exchange节点将查询计划分成不同的stages,然后按照依赖的顺序执行这些stages。查询stage会在其完成的时候物化。当一个stage结束,根据其物化输出的统计信息,优化查询语法树的剩余部分。

Stage动态划分的逻辑也很简单。为了划分stage,spark会从下至上遍历查询语法树。当遇到exchange节点 同时如果该exchange节点的所有子查询stage都已经物化,spark会为该exchange节点创建一个新的query stage。一旦创建了新的查询stage,提交执行,异步物化。

当一个查询stage完成物化输出,剩余的query查询就会重新优化和根据最新的所有已经完成的stage的统计信息重新设计执行计划。也即是会重新遍历query语法树,在可能的情况下创建新的stage。

3.Spark 3.1 自适应引擎的配置

Spark 目前的master分支关于自适应执行计划的参数是最全的,3.0.0的预发布版本都不全。下面是浪尖整理的关于Spark 3.1版本的关于自适应执行计划的参数及其解释。

1.开启自适应查询引擎

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.enabled

默认值是false。设置为true的话,就是开启了Spark SQL自适应查询引擎。所谓自适应查询引擎,实际上就是在运行时,通过一些统计指标来动态优化Spark sql的执行计划。

2.强制开启自适应查询引擎

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.forceApply

默认值是false。当query查询中没有子查询和Exchange的时候,不会使用自适应执行计划的。所以,为了使用自适应执行计划,除了开启enabled配置外,还要配置该参数为true。

3.查询引擎开启时日志等级

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.logLevel

默认是debug。可选"TRACE", "DEBUG", "INFO", "WARN", "ERROR"。含义是使用自适应查询计划,计划变化之后自适应引擎打的日志等级。

4.分区大小控制

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.advisoryPartitionSizeInBytes

默认值64MB。这个参数控制着开启自适应执行优化之后每个分区的大小。在两种情况下,有用:

  1. 合并小分区。
  2. 分割倾斜的分区。

5.开启合并shuffle分区

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.coalescePartitions.enabled

默认值是true。根据spark.sql.adaptive.advisoryPartitionSizeInBytes参数设置的分区大小,合并连续的shuffle分区,避免产生过多小task。

6.分区合并后的最小值

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.coalescePartitions.minPartitionNum

默认值是大家熟悉的spark的默认defaultParallelism。不保证合并后的shuffle分区数一定比这个参数设置的值小,是一个推荐值。

7.分区合并的初始值

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.coalescePartitions.initialPartitionNum

默认值是spark.sql.shuffle.partitions 。合并之前shuffle分区数的初始值。

8.是否以批量形式拉取block数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.fetchShuffleBlocksInBatch

默认值是true。当获取连续的shuffle分区的时候,对于同一个map的shuffle block可以批量获取,而不是一个接一个的获取,来提升io提升性能。但是要明白,一个map shuffle输出一般对于一个reduce只有个block块,那么这种情况要发生只会发生在自适应执行计划开启,且开启了自适应合并shuffle分区功能。

9.开启本地shufflereader

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.localShuffleReader.enabled

默认值是true。开启自适应执行计划后,该值设为true,spark会使用本地的shuffle reader去读取shuffle数据,这种情况只会发生在没有shuffle重分区的情况。比如,sort-merge join转化为了broadcast-hash join。

10.数据倾斜自动处理

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.skewJoin.enabled

默认值是true。在自适应执行计划开启后,该值为true,spark会动态的处理 sort-merge join的数据倾斜,处理的方式是分区分割,或者分区复制。

11.分区倾斜比例因子

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.skewJoin.skewedPartitionFactor

默认值是10.假如一个分区数据条数大于了所有分区数据的条数中位数乘以该因子,同时该分区以bytes为单位的大小也大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,则视为分区数据倾斜了。

12.分区倾斜bytes阈值

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

默认值是256MB,该分区以bytes为单位的值大于该值,同时分区数据条数大于了所有分区数据的条数中位数乘以spark.sql.adaptive.skewJoin.skewedPartitionFactor因子,则视为分区数据倾斜了。

13.非空分区因子

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin

默认值是0.2 。假如参与join的非空分区占比小于该比例,那么该表不会被作为broadcast表去使用,不将表的数据量作为比较依据。但是join的另一张表依然可以用来作为广播表。

从配置中可以看出,自适应执行计划针对以下几个场景:

  1. SortMergeJoin转化为BroadcastHashJoin。
  2. 分区合并。适合shuffle之后小分区特多场景
  3. 小分区数据倾斜的解决。

4. Spark 3.1案例

首先将Spark 源码clone下来,master分支版本就是3.1.0-SNAPSHOT,然后将Spark 源码install到你本地maven的.m2目录下,就可以直接在你工程里引入3.1.0-SNAPSHOT版本的Spark 依赖了。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
mvn clean install -DskipTests

整个过程看你的网速和电脑性能了,几十分钟到若干小时吧。

主要引入的依赖有以下几个:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>${scala.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-unsafe_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-tags_2.12</artifactId>
    <version>${spark.version}</version>
        </dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-catalyst_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>

假如你maven install成功之后版本就是3.1.0-SNAPSHOT。也可以直接关注公众号,输入 3.1 获得浪尖编译的jar,直接可以加到测试的工程里。

环境要求比较简单 :java版本要求 1.8,scala版本 2.12.10.

首先准备数据集:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
val sparkConf = new SparkConf()
sparkConf.setMaster("local[*]")
//    sparkConf.set("spark.default.parallelism","4")
sparkConf.set("spark.sql.shuffle.partitions","4")

sparkConf.setAppName(this.getClass.getCanonicalName)
val spark = SparkSession
  .builder()
  .appName(this.getClass.getCanonicalName)
  .config(sparkConf)

  .getOrCreate()

def createTable(spark:SparkSession): Unit ={

  import spark.implicits._
  val testData = spark.sparkContext.parallelize(
    (1 to 100).map(i => TestData(i, i.toString))).toDF()

  testData.createOrReplaceTempView("testData")
  
  val testData2 = spark.sparkContext.parallelize(
    TestData2(1, 1) ::
      TestData2(1, 2) ::
      TestData2(2, 1) ::
      TestData2(2, 2) ::
      TestData2(3, 1) ::
      TestData2(3, 2) :: Nil, 2).toDF()
  testData2.createOrReplaceTempView("testData2")
}

下面就是将SortMergeJoin转化为BroadcastHashJoin的案例。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def changeMerge2Broadcast(spark:SparkSession): Unit ={
  val query = "SELECT * FROM testData join testData2 ON testData.key = testData2.a  where value = '1'"

  spark.sql(query).explain()
  val conf = SQLConf.get
  conf.setConfString(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key,"true")
  conf.setConfString(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key,"true")
  conf.setConfString(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key , "80")
  conf.setConfString(SQLConf.ADAPTIVE_EXECUTION_LOG_LEVEL.key,"ERROR")
  val df2 = spark.sql(query)

  df2.collect()
}

首先是配置没有变更之前的执行计划输出为:

为了查看Spark 将执行计划由SortMergeJoin转化为BroadCastHashJoin的过程,可以将SparkConf配置中的日志等级设置为ERROR,默认debug。然后就可以直接通过log查看转化过程:

5. 总结

其实,别看代码少,本地做这个案例需要的知识点还比较多的。

乍一看,肯定看不出来什么门道的,案例代码少只需要几个配置,但是这属于细思极恐的案例。

问题一:

浪尖在创建SparkSession的时候特意指定了Spark SQL的shuffle分区为4。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
sparkConf.set("spark.sql.shuffle.partitions","4")

请问这是为什么?

问题二:

采用下面的df创建表,跑一下案例,看看会有什么效果。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
importspark.implicits._
(1 to 100).map(i => TestData(i, i.toString)).toDF()
Seq(TestData2(1, 1) ,
  TestData2(1, 2),
  TestData2(2, 1) ,
  TestData2(2, 2) ,
  TestData2(3, 1),
  TestData2(3, 2)).toDF()

答案自己测测,或者多读读配置就好了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
本文介绍了如何在Windows系统下配置Hadoop环境,并实现通过Java操作HBase进行数据表创建、删除、更新和查询。主要包括了配置环境变量、配置Hadoop、创建HBase表、使用HBase Java API进行操作等步骤。在开发过程中需要注意一些细节,例如环境变量配置、HBase和Hadoop的版本适配问题等。通过本文的介绍,可以帮助开发人员快速搭建起一套可运行的Hadoop和HBase集成环境,为进一步的Hadoop和HBase应用开发提供支持。
别先生
2018/01/02
1.4K0
mybatis框架org.apache.ibatis.exceptions.PersistenceException:
在学习mybatis的第一个练习,求大佬帮助解决 org.apache.ibatis.exceptions.PersistenceException: ### Error building SqlSession. ### The error may exist in SQL Mapper Configuration ### Cause: org.apache.ibatis.builder.BuilderException: Error parsing SQL Mapper Configuration. C
沁溪源
2020/09/03
1.4K0
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /
别先生
2018/01/02
1.5K0
connection closed gracefully_kalilinux中文乱码
最终通过修改服务器上文件的权限得以顺利执行,用此方法存文件需要文件对这个用户开放执行(x)的权限我直接设置的(rwx)
全栈程序员站长
2022/09/27
4220
java.net.UnknownHostException: master
别先生
2018/01/02
1.8K0
freemarker错误九
不能将日期类型直接转换为字符串。由于不知道是日期类型中的哪一种在使用。是date、time还是datetime。而且不知道转换成字符串日期究竟显示成什么格式,是yyyy-MM-dd。还是HH:mm:ss,还是yyyy-MM-dd HH:mm:ss
全栈程序员站长
2022/07/05
3120
org.springframework.orm.hibernate3.HibernateQueryException: unexpected token: where near line 1, col
org.springframework.orm.hibernate3.HibernateQueryException: unexpected token: where near line 1, column 72
qubianzhong
2018/09/19
1.1K0
Java中List迭代过程中删除、新增元素的处理
java.util.ConcurrentModificationException
用户7886150
2020/12/13
1.2K0
freemarker自己定义标签报错(三)
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/118342.html原文链接:https://javaforall.cn
全栈程序员站长
2022/07/13
4420
Hibernate之一级缓存和二级缓存
本文介绍了如何使用Hibernate进行二级缓存,包括一级缓存和二级缓存,以及缓存的配置、使用策略和缓存策略等。
别先生
2018/01/02
9640
Caused by: java.lang.ClassNotFoundException: org.apache.catalina.LifecycleException
笔者在项目中采用dubbo作为分布式服务框架,在eclipse时直接启动部署dubbo服务的war包程序运行正常,但是执行junit单元测试时却出现如下错误提示:
johnhuster的分享
2022/03/28
6910
Spring之注解实现aop(面向切面编程)
1:Aop(aspect object programming)面向切面编程,名词解释:     1.1:功能:让关注点代码与业务逻辑代码分离     1.2:关注点         重复代码就叫做关
别先生
2018/01/02
1K0
Spring之注解实现aop(面向切面编程)
Spring 容器 17 个常用注解总结
传统的Spring做法是使用.xml文件来对bean进行注入或者是配置aop、事物,这么做有两个缺点:
江南一点雨
2019/08/13
7560
大型项目架构搭建_高内聚、低耦合(1)
软件开发中的项目的架构的终极目标就六个字: 高内聚、低耦合; 架构搭建好了,后期可维护性才能提高,在这基础上,如果还想提高项目的高效性,就只能从算法着手 如果项目中算法都已经做到极致了,还想提高项目的高效性,怎么办?(算法并不一定能处理高并发的问题) 就要用到中间件的技术了 下面我就从这三方面来搭建一个项目 假设我们有个这么需求 我公式和YC公式合作,开发一个项目,这个项目功能是用来测量东西的一个项目,我们公式在存储数据方面做得好,YC公司所以要和我们合作,要我们做一个对数据进行保存的容器,他们公司直
汤高
2018/01/11
1.2K0
MyBatis-06 MyBatis XML方式之多个接口参数用法
当参数是一个基本类型的时候,它在XML文件中对应的SQL语句只会使用一个参数,比如delete方法
小小工匠
2021/08/17
5280
java.lang.IllegalArgumentException: node to traverse cannot be null!
java.lang.IllegalArgumentException: node to traverse cannot be null!
时间静止不是简史
2020/07/27
8890
java.lang.IllegalArgumentException: node to traverse cannot be null!
Exception in thread "main" java.lang.NullPointerException
1、在window操作系统上,使用eclipse开发工具从hdfs分布式文件系统上下载文件报空指针异常解决方法: log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.
别先生
2018/04/17
1.5K0
Exception in thread "main" java.lang.NullPointerException
java.lang.NoSuchMethodError: org.apache.log4j.Logger.isTraceEnabled()Z
原因是我的 log4j 是最低版本的 1.2.4,解决办法就是升级到 1.2.12+ 的版本。
wsuo
2020/07/31
2.8K0
java.lang.NoSuchMethodError: org.apache.log4j.Logger.isTraceEnabled()Z
java9系列(五)Stack-Walking API
java9新增这个类的目的是提供一个标准API用于访问当前线程栈,之前只有Throwable::getStackTrace、Thread::getStackTrace以及SecurityManager::getClassContext提供了方法可以获取线程栈。
code4it
2018/09/17
7510
SpringBoot源码解析之main方法推断
在《SpringBoot源码解析之应用类型识别》一文中我们已经了解了SpringBoot是如何推断出应用类型的。在推断出应用类型之后,SpringBoot又进行了main方法的推断。
程序新视界
2019/05/29
1.5K0
推荐阅读
相关推荐
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档