Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >工作经验分享:Spark调优【优化后性能提升1200%】

工作经验分享:Spark调优【优化后性能提升1200%】

作者头像
用户1410343
发布于 2021-03-03 06:57:13
发布于 2021-03-03 06:57:13
2K00
代码可运行
举报
文章被收录于专栏:about云about云
运行总次数:0
代码可运行

问题导读 1.本文遇到了什么问题? 2.遇到问题后,做了哪些分析? 3.本文解决倾斜使用哪些方法? 4.本次数据倾斜那种方法更有效? 5.解决性能优化问题的原理是什么? 优化后效果 1.业务处理中存在复杂的多表关联和计算逻辑(原始数据达百亿数量级) 2.优化后,spark计算性能提升了约12倍(6h-->30min) 3.最终,业务的性能瓶颈存在于ES写入(计算结果,ES索引document数约为21亿 pri.store.size约 300gb)

1. 背景 业务数据不断增大, Spark运行时间越来越长, 从最初的半小时到6个多小时 某日Spark程序运行6.5个小时后, 报“Too large frame...”的异常 org.apache.spark.shuffle.FetchFailedException: Too large frame: 2624680416 2. 原因分析 2.1. 抛出异常的原因 Spark uses custom frame decoder

(TransportFrameDecoder) which does not support frames larger than 2G.

This lead to fails when shuffling using large partitions.

根本原因: 源数据的某一列(或某几列)分布不均匀,当某个shuffle操作是根据此列数据进行shuffle时,就会造成整个数据集发生倾斜,即某些partition包含了大量数据,超出了2G的限制。

异常,就是发生在业务数据处理的最后一步left join操作 2.2. 粗暴的临时解决方法 增大partition数, 让partition中的数据量<2g 由于是left join触发了shuffle操作, 而spark默认join时的分区数为200(即spark.sql.shuffle.partitions=200), 所以增大这个分区数, 即调整该参数为800, 即spark.sql.shuffle.partitions=800 2.3. 解决效果 Spark不再报错,而且“艰难”的跑完了, 跑了近6个小时! 通过Spark UI页面的监控发现, 由于数据倾斜导致, 整个Spark任务的运行时间是被少数的几个Task“拖累的”

3. 思考优化 3.1. 确认数据倾斜 方法一: 通过sample算子对DataSet/DataFrame/RDD进行采样, 找出top n的key值及数量 方法二: 源数据/中间数据落到存储中(如HIVE), 直接查询观察 3.2. 可选方法 1.HIVE ETL 数据预处理 把数据倾斜提前到 HIVE ETL中, 避免Spark发生数据倾斜 这个其实很有用 2.过滤无效的数据 (where / filter) NULL值数据 “脏数据”(非法数据) 业务无关的数据 3.分析join操作, 左右表的特征, 判断是否可以进行小表广播 broadcast (1)这样可避免shuffle操作,特别是当大表特别大 (2)默认情况下, join时候, 如果表的数据量低于spark.sql.autoBroadcastJoinThreshold参数值时(默认值为10 MB), spark会自动进行broadcast, 但也可以通过强制手动指定广播 visitor_df.join(broadcast(campaign_df), Seq("random_bucket", "uuid", "time_range"), "left_outer") 业务数据量是100MB (3)Driver上有一个campaign_df全量的副本, 每个Executor上也会有一个campaign_df的副本 (4)JOIN操作, Spark默认都会进行 merge_sort (也需要避免倾斜) 4.数据打散, 扩容join 分散倾斜的数据, 给key加上随机数前缀 A.join(B)

1.提高shuffle操作并行度 spark.sql.shuffle.partitions 2.多阶段 aggregate操作: 先局部聚合, 再全局聚合 给key打随机值, 如打上1-10, 先分别针对10个组做聚合 最后再统一聚合 join操作: 切成多个部分, 分开join, 最后union 判断出,造成数据倾斜的一些key值 (可通过观察或者sample取样) 如主号 单独拎出来上述key值的记录做join, 剩余记录再做join 独立做优化, 如broadcast 结果数据union即可 3.3. 实际采用的方法 HIVE 预处理 过滤无效的数据 broadcast 打散 --> 随机数 shuffle 并行度

Example:

代码语言:javascript
代码运行次数:0
运行
复制
  1. ......
  2. visitor_leads_fans_df.repartition($"random_index")
  3. .join(broadcast(campaign_df), Seq("random_bucket", "uuid", "time_range"), "left_outer")
  4. .drop("random_bucket", "random_index")
  5. ......
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-02-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 About云 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【大数据】Spark优化经验&案例--数据倾斜
TOC 0. 十秒看完 1.业务处理中存在复杂的多表关联和计算逻辑(原始数据达百亿数量级) 2.优化后,spark计算性能提升了约12倍(6h-->30min) 3.最终,业务的性能瓶颈存在于ES写入(计算结果,ES索引document数约为21亿 pri.store.size约 300gb) [优化完整过程] 1. 背景 业务数据不断增大, Spark运行时间越来越长, 从最初的半小时到6个多小时 某日Spark程序运行6.5个小时后, 报“Too large frame...”的异常 org.apach
onephone
2020/04/27
3.2K0
【大数据】Spark优化经验&案例--数据倾斜
自己工作中超全spark性能优化总结
Spark是大数据分析的利器,在工作中用到spark的地方也比较多,这篇总结是希望能将自己使用spark的一些调优经验分享出来。
guichen1013
2020/11/25
2K0
自己工作中超全spark性能优化总结
Spark性能优化指南——高级篇
原文:https://tech.meituan.com/spark-tuning-pro.html
solve
2019/10/30
8710
Spark性能优化指南——高级篇
Spark性能优化总结
Spark的瓶颈一般来自于集群(standalone, yarn, mesos, k8s)的资源紧张,CPU,网络带宽,内存。通过都会将数据序列化,降低其内存memory和网络带宽shuffle的消耗。
王知无-import_bigdata
2020/04/02
1.5K0
三万字长文 | Spark性能优化实战手册
在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。
王知无-import_bigdata
2020/02/10
1.2K0
「Spark从精通到重新入门(一)」Spark 中不可不知的动态优化
Apache Spark 自 2010 年面世,到现在已经发展为大数据批计算的首选引擎。而在 2020 年 6 月份发布的Spark 3.0 版本也是 Spark 有史以来最大的 Release,其中将近一半的 issue 都属于 SparkSQL。这也迎合我们现在的主要场景(90% 是 SQL),同时也是优化痛点和主要功能点。我们 Erda 的 FDP 平台(Fast Data Platform)也从 Spark 2.4 升级到 Spark 3.0 并做了一系列的相关优化,本文将主要结合 Spark 3.0 版本进行探讨研究。
开源小E
2021/11/19
1K0
「Spark从精通到重新入门(一)」Spark 中不可不知的动态优化
Spark性能调优04-数据倾斜调优
数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。
CoderJed
2018/09/13
1.5K0
Spark性能调优04-数据倾斜调优
【技术博客】Spark性能优化指南——高级篇
前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题。 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。 数据倾斜发生时的现象 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1
美团技术团队
2018/03/12
2.1K0
【技术博客】Spark性能优化指南——高级篇
Spark性能调优-Shuffle调优及故障排除篇(万字好文)
在划分stage时,最后一个stage称为FinalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。
五分钟学大数据
2021/04/02
3.4K0
Spark性能调优方法
主要原因是SparkSQL是一种声明式编程风格,背后的计算引擎会自动做大量的性能优化工作。
lyhue1991
2021/01/26
4K0
一文教你快速解决Spark数据倾斜!
Spark 中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。
不温卜火
2020/10/28
6670
一文教你快速解决Spark数据倾斜!
Spark性能调优指北:性能优化和故障处理
Spark 官方推荐,Task 数量应该设置为 Spark 作业总 CPU core 数量的 2~3 倍。
大数据技术架构
2021/08/25
1.1K0
Spark性能调优指北:性能优化和故障处理
Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势
原创文章,转载请务必将下面这段话置于文章开头处。 本文转发自技术世界,原文链接 http://www.jasongj.com/spark/skew/ 摘要 本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义Partitioner,使用Map侧Join代替Reduce侧Join,给倾斜Key加上随机前缀等。 为何要处理数据倾斜(Data Skew) 什么是数据倾斜 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据
Jason Guo
2018/06/11
2.3K0
Spark 数据倾斜及其解决方案
本文从数据倾斜的危害、现象、原因等方面,由浅入深阐述Spark数据倾斜及其解决方案。
2020labs小助手
2019/12/30
1.1K0
Spark之数据倾斜调优
有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。
王知无-import_bigdata
2019/06/03
6260
尝尝鲜|Spark 3.1自适应执行计划
每个框架产生都是为了解决一类问题,每个模块的优化也是为了解决一定的场景下的性能瓶颈。浪尖今天分享的关于Spark 3.1之后的自适应执行计划,主要针对以下几个场景,并且有百度率先研发的,不过社区之前一直没有采纳,spark 3.0的预发布版本参数也是不全,到了Spark 3.1的beta版已经可用,浪尖已经完成了测试。
Spark学习技巧
2021/03/05
9200
尝尝鲜|Spark 3.1自适应执行计划
【Spark篇】---Spark解决数据倾斜问题
数据倾斜问题是大数据中的头号问题,所以解决数据清洗尤为重要,本文只针对几个常见的应用场景做些分析 。
LhWorld哥陪你聊算法
2018/09/13
9370
【Spark篇】---Spark解决数据倾斜问题
Spark性能优化之道——解决Spark数据倾斜的N种姿势
Spark3.0已经发布半年之久,这次大版本的升级主要是集中在性能优化和文档丰富上,其中46%的优化都集中在Spark SQL上,SQL优化里最引人注意的非Adaptive Query Execution莫属了。
TASKCTL 任务调度平台
2021/01/22
2.5K0
Spark性能优化之道——解决Spark数据倾斜的N种姿势
大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例
  每一台 host 上面可以并行 N 个 worker,每一个 worker 下面可以并行 M 个 executor,task 们会被分配到 executor 上面去执行。stage 指的是一组并行运行的 task,stage 内部是不能出现 shuffle 的,因为 shuffle 就像篱笆一样阻止了并行 task 的运行,遇到 shuffle 就意味着到了 stage 的边界。   CPU 的 core 数量,每个 executor 可以占用一个或多个 core,可以通过观察 CPU 的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个 executor 占用了多个 core,但是总的 CPU 使用率却不高(因为一个 executor 并不总能充分利用多核的能力),这个时候可以考虑让一个 executor 占用更少的 core,同时 worker 下面增加更多的 executor,或者一台 host 上面增加更多的 worker 来增加并行执行的 executor 的数量,从而增加 CPU 利用率。但是增加 executor 的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的 executor,每个 executor 的内存就越小,以致出现过多的数据 spill over 甚至 out of memory 的情况。   partition 和 parallelism,partition 指的就是数据分片的数量,每一次 task 只能处理一个 partition 的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多 executor 的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行 action 类型操作的时候(比如各种 reduce 操作),partition 的数量会选择 parent RDD 中最大的那一个。而 parallelism 则指的是在 RDD 进行 reduce 类操作的时候,默认返回数据的 paritition 数量(而在进行 map 类操作的时候,partition 数量通常取自 parent RDD 中较大的一个,而且也不会涉及 shuffle,因此这个 parallelism 的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过 spark.default.parallelism 可以设置默认的分片数量,而很多 RDD 的操作都可以指定一个 partition 参数来显式控制具体的分片数量。   看这样几个例子:   (1)实践中跑的 Spark job,有的特别慢,查看 CPU 利用率很低,可以尝试减少每个 executor 占用 CPU core 的数量,增加并行的 executor 数量,同时配合增加分片,整体上增加了 CPU 的利用率,加快数据处理速度。   (2)发现某 job 很容易发生内存溢出,我们就增大分片数量,从而减少了每片数据的规模,同时还减少并行的 executor 数量,这样相同的内存资源分配给数量更少的 executor,相当于增加了每个 task 的内存分配,这样运行速度可能慢了些,但是总比 OOM 强。   (3)数据量特别少,有大量的小文件生成,就减少文件分片,没必要创建那么多 task,这种情况,如果只是最原始的 input 比较小,一般都能被注意到;但是,如果是在运算过程中,比如应用某个 reduceBy 或者某个 filter 以后,数据大量减少,这种低效情况就很少被留意到。   最后再补充一点,随着参数和配置的变化,性能的瓶颈是变化的,在分析问题的时候不要忘记。例如在每台机器上部署的 executor 数量增加的时候,性能一开始是增加的,同时也观察到 CPU 的平均使用率在增加;但是随着单台机器上的 executor 越来越多,性能下降了,因为随着 executor 的数量增加,被分配到每个 executor 的内存数量减小,在内存里直接操作的越来越少,spill over 到磁盘上的数据越来越多,自然性能就变差了。   下面给这样一个直观的例子,当前总的 cpu 利用率并不高:
黑泽君
2019/05/14
3K0
大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例
3万字长文,PySpark入门级学习教程,框架思维
关于PySpark,我们知道它是Python调用Spark的接口,我们可以通过调用Python API的方式来编写Spark程序,它支持了大多数的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我们了解Python的基本语法,那么在Python里调用Spark的力量就显得十分easy了。下面我将会从相对宏观的层面介绍一下PySpark,让我们对于这个神器有一个框架性的认识,知道它能干什么,知道去哪里寻找问题解答,争取看完这篇文章可以让我们更加丝滑地入门PySpark。话不多说,马上开始!
Sam Gor
2021/08/13
10.5K0
推荐阅读
相关推荐
【大数据】Spark优化经验&案例--数据倾斜
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验