00:00
后呢,我们再来介绍一个更加复杂一点,比较特殊的一个udf,就是所谓的表聚合函数,大家从字面上理解的话,这个就相当于是不是把那个表函数和聚合函数合在一起了呀,有点像是这种感觉对吧?那它的输出是个什么东西呢?哦,大家注意就是首先它的输入还是按照我们之前的那个定义,是不是相当于应该有一个分组,然后呃,后边是不是相当于把这个分组的数据要做一个聚合呀,跟之前的聚合函数差不多的,区别就在于之前的聚合函数是。只输出一个结果,哎,大家看到了对吧,那只输出一个结果,然后我们直接当成一个字段直接追,追加在后边就完事了,而现在的这个表聚合函数呢。它是对,它可以输出多个结果,所以大家想,既然你最后输出的又是一个多个结果,可以是一个多行多列的一个结果,那这是不是就相当于输出了一张表啊,所以我们说它是一个表聚合函数啊,就是有点类似于之前表函数输出的是一个多行对吧?啊,那另外呢,它又是做了一个聚合之后的结果,所以那大家会想到这样一个东西,它到底使用的过程当中有什么实际的应用场景呢?
01:18
呃,有一些例子里边有一些需求,确实是就用之前我们讲的那三类udf是实现不了的,那比如说像这里大家看到我可以取一个top n,大家看我现在要做一个TOP2的选取,这个怎么去做呢?按之前我们的那个聚合的话,其实也能够做到,大家想我可以怎么做。按照我们那个就是直接一个聚合函数去做的话,我可以怎么做啊,是不是可以直接把当前的。啊,就是我我当前的这个,呃,最最大的这个呃价格啊,Price和第二高的这个价格都保存成状态,然后最后我是不是变成两个字段都都追加到后边啊,对吧,那就相当于我最后输出的是两个字段一个元组嘛,那就相当于还是输出一个结果嘛,但是大家会想到我们最后如果要输出这个topn的时候,其实不希望在一条数据里边把它全部输出的,我们往往是还需要给他做一个排序,就是列出来第一第二第三,对吧,挨个把它做一个排序,所以在这种场景下,最终我们是不是做一个多条输出会更合理啊。
02:33
那这种多条输出怎么做呢?非常简单,就是用一个table的方式,大家看,就是我接下来呢,要实现一个自定义的表句和函数,就是继承这个tableable a的function这个抽象类来做一个实现,那么它的底层跟前边我们讲到的聚合函数非常的类似,它也有一个accumulator作为中间状态,同样也有一个create accumulator是不是初始状态先创建啊,创建起来之后大家看是不是每来一条数据就要,诶在这里边就要做一个聚合对吧,做一个这个结果啊,然后这里边大家看每一次调用的是不是还是叫做一个accumulate的一个方法呀,啊就是这个其实过程都是完全一样的啊,然后这里边的区别在于我当前第一个数据来了之后,六来了之后,那接下来是不是里边我保存的状态,应该大家想是不是肯定有两个啊,一个最高温度值,一个第二高温度值,那所以接下来六来了之后,我是不是只有最高温度值六啊,然后接下来三来了之后,是不是变成六。
03:36
干啊,两个两个那个状态了,对吧,这这两个都要保存在里边,然后这个五来了之后呢,我更新的是。大家注意这更新的是第二个第二高的那个温度,那个不是温度啊,是当前的价格对吧,更新的是当前第二高的价格那个状态,所以是变成了六五,然后如果要是八来了呢。八来了,比这两个价格是不是都高,所以它是不是相当于直接八变成了当前最高的价格,然后把六还要挪到第二高的价格位置上啊,啊,所以大家看到这里边就是我们要处理的逻辑,可能就相对来讲会比较复杂啊,当然也有同学说,那你这是TOP2嘛,如果要是TOP5TOP10呢?哎,那可能就干脆你就不要一个一个判断了,我是不是直接把它放在一个例子里边,直接去做一个排序就好了呀。
04:29
啊,当然这个过程就还是很麻烦对吧?啊,所以这就是关于我们做这个表聚合函数的一个基本的用法啊,然后它跟前面聚合函数区别就在于最后我要输出数据的时候,诶大家看接下来是不是输出的时候就不再是直接那个get value了,而是emit value emit value大家知道什么意思,是不是接下来我还是就是要发出数据啊,不是你直接获取到一个结果啊,这就是我最终的结果了,而是我是不是可以发出多个结果啊,哎,这就又是我们之前类似于flat map的那种输出方式,对吧?啊,来一个我们collect输出一次啊,啊,这就是这个表聚合函数的一个特点啊,那后面我们会发现它必须要实现的这个当前的这个方法就是这三个,而且呢,这里边跟前面聚合函数一样,Accumulateate这个方法也必须你自己去书写,而且它是就是写死的这个过程对吧,必须你定义。
05:29
就按照那个定义的标准啊,必须一模一样,那过程还是首先调a create accumulator,创建一个累加器中间聚合状态,然后接下来每来一个数据就调一次accumulate,更新当前的状态,而最后呢,最后输出的时候是调emit valueit value里边可以用那个collector输出多条数据,对吧,返回多个结果,所以最后我们得到的就是一个多条数据的一个输出。所以大家总结一下的话。
06:01
是不是就会得到一个结论,就是这相当于把我们四种情况都涵盖了,自定义的udf啊,Scale function标量函数是一对一,类似于map操作,对吧?那如果要是后面表函数table function的话,那就类似于一对多,是一个呃,类似于flat map的操作,然后后面我们聚合函数的话,Aggregate function,它是相当于多对一,得到的是一个结果对吧?呃,就类似于之前我们那个聚合操作了,而现在的这个表聚合函数的话,是不是相当于多对多呀?类似于这样的一个操作对吧?啊,所以说呃,最终我们调用的时候呢,跟前面a function做聚合类似,大家还记得之前我们那个是怎么做的吗。之前我们那个代码是。这里边是不是直接grew by,然后接下来直接aggregate呀,然后select对吧,这里边传的是那个aggregate方式,那现在怎么做呢。
07:00
现在大家还记得group之后,当时我们看到还可以调什么,是不是可以调一个flat agggate这个方法呀,哎,所以我这里边可以调一个flat aggregate,然后把注册好的那个flat,呃,当前那个flat aggregate function啊啊,就是当当前那个不是flat aggre function是应该叫做table aggregate function对吧,表句和函数然后放在这里,然后接下来select对应的,把对应的值拿出来就完事了。这就是我们使用这个表具和函数做操作的过程啊。
我来说两句