我们现在的数据动不动就上百亿,字段动不动就是巨大的json 串,到处是疑难杂症,所以,每天就是拼命的研究这些原理,寻找优化的方法。
其实这篇是源自于我之前的一个优化案例:
优化的效果很明显,但手段很简单,难点在于对窗口函数内存使用的理解。
这篇就从内存处理的角度说一说窗口函数为啥会更容易出现性能问题。
如果觉得这篇很难懂的话,很早之前总结过窗口函数相关的一些知识点,这些知识点现在还是适用的,阔以先看看:
窗口函数比普通的聚合函数运行成本更高,为啥?
spark中窗口函数的处理逻辑的入口在WindowExec类中,在这个类里,我们能看到,ExternalAppendOnlyUnsafeRowArray是窗口函数用来存放每个窗口中数据的缓存结构:
有两个参数:
1、spark.sql.windowExec.buffer.in.memory.threshold
控制ExternalAppendOnlyUnsafeRowArray中放多少数据,默认4096,当超过这个行数,就会转为UnsafeExternalSorter。如果该参数设置太大,会占用太多内存
val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.in.memory.threshold")
.internal()
.doc("Threshold for number of rows guaranteed to be held in memory by the window operator")
.version("2.2.1")
.intConf
.createWithDefault(4096)
2、spark.sql.windowExec.buffer.spill.threshold
当ExternalAppendOnlyUnsafeRowArray转为UnsafeExternalSorter之后,UnsafeExternalSorter中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘,默认Integer.MAX_VALUE。如果该值设置太低,数据会频繁溢出并导致磁盘写入过多,从而导致性能下降。
val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.spill.threshold")
.internal()
.doc("Threshold for number of rows to be spilled by window operator")
.version("2.2.0")
.intConf
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
private[spark] val SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD =
ConfigBuilder("spark.shuffle.spill.numElementsForceSpillThreshold")
.internal()
.doc("The maximum number of elements in memory before forcing the shuffle sorter to spill. " +
"By default it's Integer.MAX_VALUE, which means we never force the sorter to spill, " +
"until we reach some limitations, like the max page size limitation for the pointer " +
"array in the sorter.")
.version("1.6.0")
.intConf
.createWithDefault(Integer.MAX_VALUE)
PS : 实际上,Integer.MAX_VALUE的值是21474836473,这个值已经很大了,正常情况下窗口内的数据条数是很难超过200亿+的,如果有这么多条,妥妥的数据倾斜
当ExternalAppendOnlyUnsafeRowArray转为UnsafeExternalSorter之后,这里还有一点,决定UnsafeExternalSorter要不要溢出数据到磁盘,除了spark.sql.windowExec.buffer.spill.threshold 这个条件之外,还有另一个条件, 那就是能否为UnsafeInMemorySorter分配足够的空间。
UnsafeExternalSorter 利用 UnsafeInMemorySorter 来实现在内存里排序,类似一个排序指针数组,ExternalAppendOnlyUnsafeRowArray 默认给这个数组1M的空间,如果用完了,就需要扩容,如果没有空间给扩容,就需要溢写。
具体判断是否需要溢写的代码如下:
所以,看吧,讲来讲去还是内存的事~
如果内存不够用,就会频繁溢写,频繁溢写的结果就是IO太多,影响效率,再严重一些,可能会OOM(因为Spark 是通过随机采样获取已经使用的内存情况,有可能因为数据量大且采样不准确而不能及时 Spill导致OOM)
所以,解决这个问题的办法是什么?
最简单的方法是,提高Executor内存,增加partition的数量,让每个partition处理更少的数据。
可是,我们知道Executor内存和partition的数量也不能无限制增加,内存加太多,会使整个任务的内存使用率很低,因为一个sql里其他的逻辑的处理可能用不了这么多内存,而partition数量增太多也会带来其他的性能问题。
所以,还有一种方法,是从sql写法上来优化,包含有窗口函数的那段sql里,不要加太多和窗口函数不相关的列,尤其是大字段,很占内存,这些列可以单独拿出来,等窗口函数计算完,再关联一次,伪代码如下:
SELECT xx,
a,
b,
c,
d,
....,
row_number() OVER(
PARTITION BY
xxid,
xxid,
xxid
ORDER BY
xx ASC
) AS rn
FROM tablex
-----------------------------------------------------------
select
window_info.*,
other_info.*
from
(SELECT
id,
row_number() OVER(
PARTITION BY
xxid,
xxid,
xxid
ORDER BY
xx ASC
) AS rn
FROM tablex) window_info
left join
(
select
id,
....
from tablex
) other_info on ..
需要注意的地方是:拆逻辑时,要确保关联键的唯一性,最好group by 一下key,或者用其他方法保证一下。
——核心思想还是分而治之!!!
从12月18号,sparksql源码讲完最后一次第二期的课程后,一切好像都停滞下来了
生活停滞、技术提升停滞 、跳舞停滞...
前些日子,曾经觉得聚沙成塔这样的事太慢,可是现在,我已经能享受慢下来的过程了,慢是为了以后能够带着更多的能量跑起来
最近这些天安静地享受着这种停滞
享受着自己现在能去自洽自己的一切状态的能力
感恩着还在等待与陪伴着我的你们
新的一年我会重新好好出发的,已经鼓起信心写下很多新的希冀
年后,sparksql源码三期继续,小皮鞭又要举起来了
精读源码,是一种有效的培养专长的方式~~
如果你想培养自己的优势
通过优势来提高自己在职场的影响力
但不知道如何开始
或者对自己没有信心
欢迎加入我创办的硬核源码学习社群(收费)
精读内容:SparkSql源码成神之路