TOC
1.业务处理中存在复杂的多表关联和计算逻辑(原始数据达
百亿
数量级) 2.优化后,spark计算性能提升了约12倍(6h-->30min) 3.最终,业务的性能瓶颈存在于ES写入(计算结果,ES索引document数约为21亿
pri.store.size约300gb
)
org.apache.spark.shuffle.FetchFailedException: Too large frame: 2624680416
not support frames larger than 2G
. This lead to fails when shuffling using large partitions
. 链接shuffle
操作是根据此列数据进行shuffle时,就会造成整个数据集发生倾斜,即某些partition包含了大量数据,超出了2G的限制。left join
操作left join
触发了shuffle操作, 而spark默认join时的分区数为200(即spark.sql.shuffle.partitions=200
), 所以增大这个分区数, 即调整该参数为800, 即spark.sql.shuffle.partitions=800
“艰难”的跑完了
, 跑了近6个小时
!sample
算子对DataSet/DataFrame/RDD进行采样, 找出top n的key值及数量这个其实很有用
where
/ filter
)小表广播
broadcastspark.sql.autoBroadcastJoinThreshold
参数值时(默认值为10 MB), spark会自动进行broadcast, 但也可以通过强制手动指定广播visitor_df.join(broadcast(campaign_df), Seq("random_bucket", "uuid", "time_range"), "left_outer")
campaign_df
全量的副本, 每个Executor上也会有一个campaign_df
的副本也需要避免倾斜
)A.join(B)
spark.sql.shuffle.partitions
先局部聚合, 再全局聚合
切成多个部分, 分开join, 最后union
Example:
......
visitor_leads_fans_df.repartition($"random_index")
.join(broadcast(campaign_df), Seq("random_bucket", "uuid", "time_range"), "left_outer")
.drop("random_bucket", "random_index")
......
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。