00:00
前面我们给大家讲解了系统内置的函数,那自然我们就想到了,那假如说现在系统内置函数里边,它现有的那些功能也满足不了我的需求呢?那有一些更加复杂的需求,大家会想到没有现成的功能,那是不是就只好自定义了啊,所以为了大家符合大家的这个实际需求啊,就是做这个功能扩展的这种需求,那所以table API和flink CQ也是提供了可扩展的udf接口的啊,这就是接下来我们要给大家说的啊,在在代码里边要去做实实现的一些内容啊,那大家可能知道就是说这个udf,这其实就是我们自己定义函数实现对应的接口,然后里边具体的处理逻辑,那就自己来设计了,对吧?啊,它就可以非常显著的扩展flink CQ里边的查询的表达功能,那么在使用的过程当中怎么用呢?简单来讲就是。哦,大家可能知道,就是即使我是写这个CQ,直接写CQ,那我也不能在CQ里边直接就是自定义一个函数,然后直接写就完了,对吧?所以我一般使用的场景呢,还是在这个Java或者skyla代码里边,先去实现一个接口,或者实现一个抽象类,记成一个抽象类,把对应的需要实现的这个功能封装在一个类里边,然后接下来是不是创建这个类的一个实例,接下来应该在环境里边注册一下,是不是就可以在CQ里边使用了啊,当然大家想到如果是table API的话,我只要创建出实例应该就能用了,因为它是抓va,抓va对象嘛,对吧?呃,直接就可以调用了,那如果说你是这个写CQ的话,那需要在环境里边再注册一下才可以啊,所以这里边大家看到有一个环境里面有一个方法啊,叫做register function。
01:50
这是不是就是要注册一个函数啊,我们当前那个当前那个环境里边不是有那个catalog吗?它里边保存我们的所有当前这个表库对应的一些一些原数据,对吧,一些目录信息吗?那么当前的这个方式信息也会直接写在里边。
02:09
所以接下来我们就分门别类的给大家看一看,用户自定义的函数怎么样,能够实现哪些功能啊,首先第一个是所谓的标量函数,它的这个名称叫做function,大家注意一下,这个不是SC对吧?这个是啊,SC的话意思就是所谓的标量值的意思,那什么叫做标量函数呢?标量函数的含义其实非常简单,就是我们这里边不是来了所有的这个数据吗?我们都是一条一条数据,现在是table API或者CQ的话,其实我们操作的是不是就是表里边的一行数据啊啊一一个肉对吧?那么我现在的作用其实就是我可以把这个标量函数作用在当前的表上,可以针对每一行数据里边的一个或者多个字段,或者甚至是零个字段,对吧,我我就哪怕就是没有输入也是可以的啊,所以就是就是针对零个一个或多个标量值输入进来之后,然后经过转换计算得到一个新的标量值,所以就是说简单来讲就是一条数据输入它里边的可以提取不同的字段输入进来,对吧,然后我最后得到一个固定的值就完事了。
03:21
所以它从我们的数据对应上来讲,是不是应该是一条输入一条输出啊,从函数来讲,就是几个不同的字段,或者是零个一个对吧?呃,不同的字段输入,然后得到一个固定的值的输出,对吧?哎,所以它就是这样的一个定义,最简单的一种定义,那么我们在代码里面实现呢,那就必须要实现flink table function里边有一个扩展的,所谓这个鸡肋啊,就叫做scale function啊,这是一个抽象类,所以大家看我要实现的时候呢,就是自定义一个函一个类,然后去继承这个skill function就可以了。
04:00
这里边有一个特殊的要求,大家看下面并没有override这个关键字,对吧?这里边你随便怎么写都可以,但是呢,底层这个标量函数skill function的行为,它是由一个固定的函数决定的,这个函数叫做E,就是所谓的一个求值函数,它必须公开声明就必须是public的,而且呢,必须叫做evil,必须叫这个类型这个名字。所以这个行为稍微有点诡异是吧,这还是这个table API和CQ底层架构本身,呃,就是设计的还不是特别的完善啊,就是导致这里边我就必须写死,这里边必须有一个求职方法叫做E,那它到底什么时候掉呢?那就是我们这里边不是动态表嘛,对吧,就当前我这个表里边来了一条数据啊,我当前这个有有一个这个数据新进来,就然后你应用这个标量函数的时候,要传什么值,我就把那个对应的字段传进来,然后得到一个结果对吧?计算出一个结果,然后你想添加,比方说表里边添加字段对吧,或者说得到一个什么,呃,新的一张表里边怎么样去安排,都是自己去写就可以了。
05:09
这就是这个标量函数的用法啊,我们在代码里边做一个简单的实现吧。呃,接下来我们还是在当前的这个table API下边,因为这个函数这一部分,可能接下来我们自定义实现的内容比较多,所以直接在table API table API下我再多建一个包叫udf,然后接下来我们是UD udf test1当前是scale function,好,先把当前这个类先创建出来。呃,然后前面的这个处理流程呢,就基本上大同小异了,对吧?啊,先是把这个main方法先定义出来啊,我这里边throws exception。
06:02
然后下边自然还是先要把环境啊,就是包括我们这个表的环境啊都创建出来,然后接下来就是读取,读取数据对吧,然后后边我们再看它的这个转换操作怎么样用自定义的标量函数去做转换,那前面我这个就简单写了啊呃,我们最简单的那种方式。大家觉得是不是还是按照我们之前那个流的方式啊,先定义一个流,然后转换成那个表是最简单的方式啊,我们就还是抄之前那这种方式啊,但是这里边我好像也不需要用这个事件时间语义啊。我们就直接用最初的那个example的那个那个设置的这个流程就可以了,对吧,这里边呃,我把当前的这个环境先读进来。这是流逝的执行环境,后边应该还要有这个表的执行环境,对吧,我们把这个所有的这个环境啊,先放在上边。
07:02
然后有了环境之后,后边就是读取文件啊,然后我们转换成这个po类,对吧,先把这个都做完。前面的过程我们都很熟悉,这个就不详细再讲了啊呃,这个就是做到了这一步,然后接下来我们是不是就可以直接就是定义一个scale function啊,然后接下来定一些操作了对吧?啊,那呃,首先这个我们还是得把那个当当前的这个流先转换成表对吧?将流转换成表,呃,所以这里边我们做一个table env from data stream对吧?把当前的这个data stream直接传进来,然后后面我可以指定对应的字段啊,当前ID time stamp,呃,比方说我给个别名叫做叫做TS对吧,后面temperature给个别名叫做tempmp。
08:03
啊,这是我们当前的这个data table。啊,或者后面我们不是要指定它当前这个表就叫sensor吗?我直接把它叫做sensor table得了,对吧?啊,这都是可以的啊,这是我们当前的一些准备工作,先把它转换成表,然后接下来那是不是就可以定义我们的想要的那个呃,对应的那个skill function标量函数了,对吧?那我们先想一个具体需求吧,这个我就跟文档我们的课件里边举的例子一样了啊,比方说这里边就是一个最简单的。自定义的一个哈希函数对吧?大家想哈希函数的话,是不是应该针对输入的一个值,然后做一个转换计算之后得到一个int类型,或者一个长整型的一个值啊,对吧?呃,这个就是它散列之后的一个哈希值,所以我们就直接实现这样一个方法就可以,所以我们既然要做这个自定义,我先在下边啊,我把这个注释先写出来,当前我们是。
09:05
呃,自定义。自定义标量函数。实现求,比方说我们要求当前三次数据嘛,我们就求这个ID吧,对吧,求ID的哈希值。所以接下来我们就在下边直接做一个实现啊,对吧。实现自定义的标量函数sc function,好,所以接下来我们这个public static class,当前我把这个叫做卡西扣的啊,然后他需要去继承一个scale function来看,就是这个东西,对吧,这是当前table API里边给我们定义的一个udf user DeFine function对吧?哎,那么这个scale function大家看到它也没有任何的泛型。
10:07
也没有任何的类型,那大家会想到,那当前我的这个输入输出类型到底怎么定义呢?哦,大家看是不是在方法里边直接定义出来的呀,大家看这里边哦,这里边本身它都没有,没有对应的这个必须要实现的方法,对吧?大家看这里边就是都是一些get result type啊,就是要获取我当前的这个类型,所以说这里边我们不写也可以,它这本身是有实现的嘛,对吧,有方法体的啊,啊,那大家看这个本身这个udf里边啊,这里边有什么方法呢?这就类似于一些生命周期了,对不对,Open close了,所以这里边好像我们也没有专门能够实现的一些东西,所以它是在底层啊,本身这个table API框架里边给我们写死了,就必须在里边实现这样的一个方法叫做就是必须是public类型的,然后比方说我们要哈希实是一个int返回,对吧,然后给一个E。
11:03
就是它的返回类型,你可以随便定,它的参数类型可以随便定,但是它必须是public,而且必须叫evil对吧?必须叫这个这个名字啊,大家知道evil其实是那个evaluate对吧?计算的那个前几个字母缩写啊,啊,大家知道这个含义就可以了,所以看起来有点奇怪,有点诡异啊,但是没办法啊,我们就这么用它就完事了。那么这里边既然是要计算ID的哈希值,是不是应该把string一个string要传进来啊,对吧?所以这里边我们把string啊,ID,那或者我要是算哈西扣的话,我也未必知道它就是ID是吧,我直接传一个string进来是不是就可以了啊,这没有任何问题,然后里边算一个哈希啊,这个大家可能会想,这玩意儿我怎么去算呢?啊,这这玩意儿对最简单的方式,其实我直接偷个懒啊,大家知道在这个抓va本身里边,是不是对于这个string就有一个求哈code的一个方法呀,对吧,我偷个懒啊,直接做一个哈code直接。
12:04
返回这个就完事了啊,但是大家会想到你要直接这么做的话,那一点意义都没有啊,所以我这里面可以给一个。大家可能也听说过,我可以给一个类似于随随机数种子一样的东西,对吧?那我随机产生一个这样的一个值,或者说我从外部直接传入这样的一个值,比方说我管它叫做一个一个扩展因子吧,对吧?我我把它叫做一个factor啊,再看我定义一个private private这样一个属性啊,一个int类型的属性,比方说我直接可以定死一个初始值,比方说我给一个13,对吧?啊,然后接下来那大家想,我是不是在这个构造构造器里边,构造方法里边可以把它传进来对吧?啊,我从外边也可以再更新这个值,那么这样的话,我调用的时候是不是就相当于可以指定一个因子,然后不同的因子是不是比方说我乘以当前的这个factor。
13:02
那就可以导致我得到的返回的那个哈希code就大不一样啊,啊这样的话就可以实现了一个扩展啊功能扩展,所以这个其实还是非常简单的,那大家看一下上面我这个怎么去调呢?在这儿其实已经已经定义这玩意儿了,对吧,那接下来我们想用它的时候怎么办呢。啊,那这个是分成两类嘛,我们首先是不是应该有table API的用法,Table API的用法的话,那大家想是不是我首先应该得有一个当前这个类的实例啊,对吧,这是一个一个哈希code,这是一个类嘛,我要用的显然是它的一个实例啊,所以我直接定义一个哈code啊,比方说这个名字我还是叫做哈code啊,呃,直接new一个哈code。把这个实例创建出来,它里面是不是得传参啊,啊,这里边我随便给一个啊,比方说给一个23。没毛病对吧,大家知道一般求哈希啊什么的时候,我们给的这个数啊,就不管是随机呃,随机数种子还是说这里边我们这个乘的这个因子啊,一般给的都是一个质数或者叫素数对吧?啊,就让他这个尽量不要出现哈希碰撞嘛啊所以当然这里边其实不存在这个问题啊,因为我们直接调到底层这个哈哈Q的方法了啊,那这里边我随便给一个23,然后接下来我在用的时候大家要注意啊,必须在环境里边先对它做一个注册。
14:32
这边给大家看一下,就是需要需要在环境中。注册udf,因为系统内置的函数的话,大家知道就相当于在环境里边默认已经有了,对吧,直接调的话,他知道那个代码在哪,那现在的话你自己定义出来,你直接要调这个哈西的话,是不是必须得告诉flink的这个当前这个table API环境对吧?Table环境,呃,我得知道这个函数到底是什么操作,到底怎么样应用在表上这个才行,所以要有一个注册的过程。
15:10
哎,那接下来我接下来这个注册大家看到就是非常简单,Table en是不是有一个方法叫register function啊对吧,直接把它做一个注册,这里边需要给两个参数,一个是当前注册进来的函数名称,呃,那比方说我这个我就叫哈希code对吧,然后后边。比方说我这跟外面那个叫的一样吧,也是有一个大写对吧,给一个这个哈sh code后边是不是就得把对应的这个scale function的实例得传进来啊,啊,对吧,你得把这个当前的对象得传进来嘛,要不然我怎么知道拿什么东西去注册呢?啊,因为我已经继承了这个skill function,所以我就可以直接在这儿注册了,对不对,它本身是个udf嘛,大家还记得它的那个类型吧,对吧,它继承是user DeFine的方式啊,所以接下来就直接注册,那接下来就可以用了,我们可可以得到一个result table对吧。
16:03
Table,一个result table,接下来我们就是基于sensor table可以去直接做一个select,我就直接提取吧,大家想我现在要ID,然后比方说我要当前的TS时间戳,另外我再来一个是不是直接调用的话,我可以直接这样哈希code,然后ID是不是就可以了。对吧,大家看这个是不是就跟我们那个CQ里边的写法是一样的呀,对吧?当前的自定义的函数值啊,直接把它这个包装起来,直接这么用就完事了,好,这就是当前的这个table API的写法,那我们再来看一下这个CQ的写法啊,CQ写法更简单对吧?啊,那那这个当然CQ写法的话,我现在还那个表,我是从流里边转换过来的,是不是环境里面还没那张表啊,大家一定要注意还得注册那张表,对吧?Table env,然后点,呃,其实大家前面看到了就是register这个function的时候,下面其实有一个叫做register table的。
17:07
其实有一个这个对吧,那你想如果我要用这个的话,大家想是不是我也是传一个,呃,当前这个表的名称,然后把具体的那个table传进来,我就相当于注册好了呀。这个是差不多的啊,但是大家看他现在建议大家这被弃用了,对吧,建议大家用什么对,用create temper review对吧,因为你这里边注册表的时候,它是不是并没有对应的那个实体的那个表的关系啊,对吧,它建议是我们那个table的概念,是有实体对应关系的那个叫叫叫table,那那我们这里边临时创建的这个就是完全虚拟的这个表的话,我们是叫view对吧?啊,所以它是统一用这个的啊,大家如果觉得这个re table更更直观更好理解的话,你用它也可以,那这里边我们就create tempor review给一个名称,当前我就叫sensor,后边传进来的当前的表是sensor table,那接下来就直接写CQ了,呃,那最后还是得到一个result cql table,直接用table en调CQ query方法里边是不是直接写啊,诶那家想前面我这个怎怎么写这个C非常简单对吧。
18:19
Select ID以及TS还有哈希code ID,然后from sensor是不是就完事了?所以大家看关键就是在于你把它定义出来之后要注册对吧,注册完了之后就当成我们普通的这个SQL函数直接用就完了。然后table API里面这这里大家也注意一下,就是它的用法不能直接后面点,因为大家想点的话,它是个方法调用吧,你这个注册这只是注册在环境里边并没有,呃,就是我当前这个Java那个它它的那个类里边并没有对应的一个方法对吧?诶所以这里边还是用这个CQ里边的写法,这样这样去调啊,把它直接传进去就完事了。
19:04
好,那接下来我们看一下这个打印输出的结果,呃,得到这个table env调一个,现在大家想是不是直接就是pan STEM啊,没做任何聚合操作啊,也没有做任何的更改,所以直接to pan stream result table,然后呃,我这里边给一个肉点class对吧,把这个肉引入进来。啊,这里我们用的就是Flink.types后边做一个打印print,当前是result。啊,同样table en toend stream里边我们把这个table传进来,roll.class后边print,当前这个CQ这就完了,对吧?啊,最后不要忘记还有一个env execute把这个执行起来,这就是完整的一个自定义这个方法的一个流程啊好,那接下来我们运行一下,大家看看这个效果怎么样。
20:08
这个过程其实就是大家要熟悉一下这个流程啊,跟我们之前那个table API里里边大家看,呃,就是跟跟我们之前讲到的这个流处理API啊,Streamam API里边自定义一个函数类,大家看那个过程是不是差不多啊,只不过之前我们就是每一步那个API的操作啊,点map后边是不是直接你有一个map方式直接传这个自定义的就可以了,现在的话是我们要拗出它的实例来之后还还需要怎么样,是不是还要注册,然后再再使用啊,对吧,就有这样一个过程啊。好,大家看一下,诶,大家看现在我得到这个啊,341,哎,这个因为为什么会得到一个负的呢?哎,对,太大了对吧?呃,超出我们那个这个整数整形的那个上限了啊,所以它溢出之后就变成负数了,对吧?啊,就是成我们成了23嘛,呃,然后你看里边有一些啊,好像都是负的是吧,所有的这个值它是不一样的啊,大家看这个三色一和三色十本来很像,后面就多了一个零而已,但是我们求取出来的哈西扣的这个偏差是很大的是吧?啊这个大家如果不想得到负的话,你就乘的这个数小一点啊,稍微做一个转换就可以了。
21:18
这就是关于这个标量函数的应用。
我来说两句