这篇梳理一下sparksql实现窗口函数用到的类。
select
a,
row_number() over(partition by a order by b desc ) as rn
from testdata2
上面的语句主要分两部分
select
a,
row_number() over(partition by a order by b desc ) as rn
from testdata2
--------unresolved logical plan--------
'Project ['a, 'row_number() windowspecdefinition('a, 'b DESC NULLS LAST, unspecifiedframe$()) AS rn#10]
+- 'UnresolvedRelation [testdata2], [], false
从执行计划能够了解到sparksql描述窗口函数时用到的类的结构。
窗口函数的类结构
WindowExpression :描述该expression是一个windowExpression,继承BinaryLike,是一个二元树。
windows函数部分就是所要在窗口上执行的函数。
count 、sum、avg、first_value
描述窗口函数是SQL窗口函数还是Python用户定义的窗口函数。
SQL
Python
窗口函数定义的接口类(在OVER子句或Window子句中指定)
select a,
row_number() over wd as rn,
sum(1) over wd as num
from testdata2
window wd as(partition by a order by b desc)
vs
select a,
row_number() over(partition by a order by b desc ) as rn,
sum(1) over(partition by a order by b desc) as num
from testdata2
上面这两种只是写法不同,在效率上完全一样(可以通过对比物理执行计划得到)。
== Physical Plan ==
Project [a#3, rn#10, num#11L]
+- Window [row_number() windowspecdefinition(a#3, b#4 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#10, sum(1) windowspecdefinition(a#3, b#4 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS num#11L], [a#3], [b#4 DESC NULLS LAST]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
+- Scan[obj#2]
== Physical Plan ==
Project [a#3, rn#10, num#11L]
+- Window [row_number() windowspecdefinition(a#3, b#4 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#10, sum(1) windowspecdefinition(a#3, b#4 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS num#11L], [a#3], [b#4 DESC NULLS LAST]
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
+- Scan[obj#2]
这部分就是over里面的内容了里面也有三部分
前两部分就是把数据分区然后在分区内排序,排好了序才能很好的定位出我们需要向前或者向后取哪些数据来参与计算。这第三部分就是确定你需要哪些数据了。
case class WindowSpecDefinition(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
frameSpecification: WindowFrame)
UnspecifiedFrame:没有指定的窗口,生成未解析的逻辑执行计划时,用该类做为占位符,来说明这个地方是个窗口。
SpecifiedWindowFrame:指定的窗口,二元树。
case class SpecifiedWindowFrame(
frameType: FrameType,
lower: Expression,
upper: Expression)
extends WindowFrame with BinaryLike[Expression]
用于表示窗框类型的特征。
用于表示窗框中使用的特殊边界的特征。