00:00
前面我们说到了弗link给我们提供的所有的process function,其实使用都非常类似,那用的最常见的就是KY之后的这个process function啊,所以我们要实现的就是一个K的process function,前面我们也看到了K的process function,它是不是泛型是KIO啊,源码里面大家也看到了KIO,对吧?然后里边必须要实现的方法呢,Process element,这个跟一般的process方式是一样的,有一个输入数据来一个调用一次,然后呢,呃,有一个ctx上下文,这个上下文里边可以做很多事情,另外还有一个alt,这是用来做输出的,对吧?啊然后另外呢,大家看它还有一个很特殊的一个方法,下面这个是它的这个,呃,内部类对吧?呃,上下文啊,Contacts内部类,然后另外还有一个很特殊的方法叫做on timer on timer大家知道是什么意思吗?Timer本身就是一个定时器,对吧?啊就是一个定时器,那on timer呢?On timer其实就是大家知道在计算机领域里面啊,我们定义这个方法名称的时候,On什么什么,这其实是表示一个事件触发的操作,对不对?
01:12
啊,但家如果要是之前做过这个,比方说呃,前端编程啊,写过前端页面的话,大家可能知道在这个一个呃,111个当前这个JS里边啊,你要去定义,比方说我这里边有一个链接,或者有一个这个按钮,一个button,它里边可能都有一个方法叫on click,大家知道on click什么意思吗?对,就是我点一次的时候,我要触发什么操作对不对?所以on click这个里边实验的方法是不是都是要判断我这里捕捉我的一个事件,就是我点击这个操作是一个事件啊,然后触发这个事件的时候,接下来我要做的事情就定义在里面啊,所以这里面的on timer其实也类似,它捕捉的是一个事件,一个什么事件呢?它是一个定时器事件,定时事件对不对?所以它的前提就是我在前面要注册一个定时器,我先设一个闹钟,那大家想我设置闹钟的话,这并不是目的,我是不是要得定义出来闹钟响的时候到底要干什么事啊?所以这里的on timer其实就是告诉我们。
02:15
定时器触发闹钟响的时候,你到底想要执行什么操作啊,所以这是on timer的这个含义啊。这是一个比较特殊的用法,只有在process方式里边才有这样一个方法,对吧?啊,这里面大家可以看一下这个k process function啊,它是extend,它继承在哪里呢?它本身是一个抽象类,对吧,它继承了。它继承了abstract reach function抽象的这个rich function,这个大家知道rich function,它就实现了这个rich function接口,对吧?哎,那那所以当前我的这个process function是不是也是一个。也是一个函数啊。也是一个瑞士方式对不对?哎,那言下之意,在这个process function里边,除了这个process element,除了on timer啊,另外我们说这个上下文可以获取时间,对吧?呃,获取这个定时事件啊,注册定时事件啊,还可以做这个测输出流,做各种各样复杂的操作,除了这些之外还可以干什么?
03:20
是不是前面负函数能干的事儿,现在都能干啊?当前他是不是如果要是有这个,呃,生命周期open close是不是也可以做啊,如果要是想去get,运行上下文,去注册状态,使用这个当前的kit是不是也可以做啊,之前所有介绍的东西他都能做,所以我们说它是底层API,啥都能干对吧?呃,所以在项目实际过程当中,有时候就是拿这个东西做一个,这就相当于是我们兜底的这个方法了啊,大招对吧?呃,上来之后只要是逻辑清晰,理论上我用这种process function的方法都能够把它实现出来,所以接下来我们就在代码里边给大家做一个简单的测试,大家看看这些东西到底怎么去用。
04:07
我们还是新建一个包啊,去放当前的这个测试。Com点艾特硅谷点API test,这个还是属于API对吧,只不过这是底层API啊,Process function。好,那首先第一个这个我就给大家测一下这个key的process方式。Test one,呃,这个。Test one对吧,这个process test one。先创建出来,呃,然后前面的这个流程就大同小异了啊,还是main方法里边,我们throw一个exception里边是不是先要创建这个执行环境啊,对吧,Get in execution environment啊,然后我这里叫做env,不影响结果正确性,呃,啊,这个stay back我就不设了啊,不影响结果正确性,我设一个那个并行度是不是就完事了呀?呃,当前我要测这个,呃,这个K的process function,那跟这个时间语义我暂时也,呃呃不管那个那个时间语义对吧,我直接就用这个处理时间就完事了,然后接下来是那个读取数据,读取数据的话,我们还是直接把那边的数据直接拿过来就完事了。
05:25
好,这里socket文本流读进来,转换成sensor reading的po类型。啊,最后当然是有这个execute啊,把这个写出来,然后接下来我们这里边可能需要做一个做这个测试,测试k process function的话,那是不是先要做分组啊,这个大家知道对吧,Kid process function。哦,那么我们要先分组,然后自定义处理,所以这里边我们基于前面的data stream先做一个KBY,呃,我当前是sens reading嘛,那还是基于这个ID做一个KBY吧,然后下边我可以直接定义一个直接调process,那这是不是就是要调用这个底层的process function API啊,只要调process就可以,里边我去你有一个自定义的,比方说我叫my process,那这就是一个这样的一个使用过程,对吧?啊那那当然了,最后我可以直接直接把它打印输出,这个我就不写,就是不写这个,当前这是一个result stream了,对吧?啊,这个大家直接打印就可以啊,接下来我们就写一下实现自定义的处理函数。
06:53
Public static class我定义的叫my process,那么当前我们要,诶不是employment,大家注意对,因为当前的这个process function其实都是一个抽象类,对吧?啊,那K的process function是这样,那一般化的这个process function是不是也是这样啊,对吧,它也是继承自obstract rich function也是一个rich function啊,那所以这里边我们要做的是extend一个key的。
07:22
Process function里边的类型是KIO,那KIO大家还记得怎么写吗?这里多了一个多了一个类型叫KK的话是不是得基于之前KBY之后的那个k stream的K啊,那当前这个按照字段去做这个键的指令的话,我们得到的那个K是不是应该是个元组类型啊,这个还记得吧?Temple,然后输入输出,输入的话sensor reading对吧?呃,输出的话,这个看大家想输出什么,比方说我就输出一个这个in inte吧,对吧?随便找一个这个整形数据输出就完事了,在这一写上面是不是整个就不报错了啊,类型是没有问题的啊。然后接下来我们看到里边必须要实现的一个方法,就叫做process element对吧?这个方法必须要实现。呃,这里边其实就是来一个数据sensor reading的一个value,然后就可以,诶,就是用这个al.collect去做这个对应的输出,对吧,非常的简单啊,比方说我这里面直接输出al.collect。
08:22
直接拿当前那个I get ID对吧,用ID的长长度lengths直接输出是不是就完事了,这个很简单啊,呃,基本的一个一个写法啊,那这里边要给大家重点说的就是当前的这个contacts有什么用呢?这里边这个contacts它跟别人,别人不是多出来这这个东西吗?那这个东西到底有啥用呢?啊,那首先给大家看一下就是。Ctx点大家看它具体能调的这个操作,首先有这么几个,一个就是可以可以获取当前的时间戳,对吧?啊,就当前不是数据来了吗?啊,他直接就能把这个时间戳获取到啊,但是有同学说这个也没啥意思,对吧,你这里边这个我从这个数据再重新提取一次,不也一样吗?啊,但是看起来好像是这个意义好像不是那么的大啊,那另外还有就是get current key,它是不是可以直接获取到当前K是什么呀?这就是我们说的上下文,对吧?你要想知道当前到底是哪个三四,我是直接可以得到的啊啊然后另外还可以大家看这个是output,这个output是啥呢?我们前面这里边不是已经有了一个out.connect这是输出吗?那这里边ctx output,诶,大家看到里边传的参数是不是有那个output tag呀,Output tag大家知道是不是只在定义测输出流的时候要去指定一个标签。
09:53
二后面我们去拣选啊,哎,所以这里边既然有output t,那是不是就代表这里可以做一个侧输出流的输出啊,啊,所以这里边这个操作,这就呃是我们讲这个分流啊,就可以在这里去定义了,当然这个奥T的定义稍微麻烦一点啊,这个我就先注掉,先不写了,然后另外还有一个操作,这个更加重要,那就是最后这个timer service对吧,定时服务,我们看一下这个玩意儿里边又有什么东西。
10:25
里边其实就是六个方法,大家看啊,就主要就是这六个方法,别的就是那个底层的那些对吧,重写的方法了啊,那这里边大家看一下这六个方法到底是哪些东西呢?啊,就是这里边timer service,大家看到返回的是一个就叫做timer service的这样的一个类的对象,对吧?那我们接下来就看这个timer service里边的这六个方法分别是什么?首先我们看到的是大家看最下面的这两个是不是可以获取当前的processing time以及当前的current water mark,大家知道current water mark是不是就相当于是当前的事件时间啊,所以这其实就是看不同的时间语义嘛,你是事件处理时间的话,我可以获取到当前的处理时间,如果是事件时间的话,可以获取到当前的watermark,这不就是可以获取到当前时间进展到什么程度吗?呃,先能拿到当前的时间,然后呃,然后当前数据的那个时间戳,是不是我可以从这。
11:25
这个提取出来啊,对吧,这个都是没没问题的啊,然后另外大家看到还可以诶,大家看是不是可以register啊,可以注册register的话,又有两种,Register processing time timer,另外还有register,对,是不是可以注册这个even time timer啊。啊,就是可以注册处理时间的定时器和事件时间的定时器,这里边都要传一个,传一个长整型的值,那这是什么呢?这就是那个定时期要触发的时间点的那个时间戳,对吧?长整形就是一个时间戳嘛,所以你这里面可以直接给一个,比方说我直接给一个1万,这都可以,但是大家知道一般我不会这么去设,这么设的话意思是啥呀。
12:14
这就是在处理时间到达1万毫秒的时候。然后执行这样一个定时操作,那大家知道这个1万毫秒,这相当于是什么时间。是当前时间再过1万毫秒吗?不是的啊,这是绝对的那个时间戳,对不对,这是从1970年1月1号伦敦的那个标准UGC时间开始算起过1万毫秒,对吧?诶,那可能就是1970年的某个时间点,对不对,哎,对吧?所以这个一般不会直接去给啊,那大家想一般这里面给的是什么呢?哎,我我这里面可以注册,按照什么来注册,我是不是可以直接按照当前value的这个time stamp,我拿到当前的这个时间戳,基于它再去延迟个几秒啊,啊对吧,我可以做这样的一个操作呢,比方说加时,这是不是当前的那个时间秒加了十秒,注意这里边给的是不是应该是一个毫秒数,所以后边是不是我可以再去给一个。
13:14
对,是不是再乘以1000就完事了对吧?啊,当然就是说这里边我们是从这个value里边去提取了,你直接用当前的这个time STEM是不是也可以啊,这个本来就是一个提取出来的好秒述对吧?啊,这个就稍微的简单一点啊呃,一般这两个操作都差不多,所以说这个我们就不详细去去给大家做这个处理了啊啊那另外大家会想到这个闹钟有开就有关有设定,是不是就应该有取消啊哎,所以大家看最后的那个方法,另外还有两个是不是叫delete呀啊delete的话同样也是里边要传一个对应的这个时间戳,那大家看是不是前面你设置了什么时间点的那个时间,呃,定时器现在如果要是想把它取消,这个闹钟不要响的话,那那是不是就是直接把那个时间出再传进来就把它删掉了,所以大家会发现我这里边去判断这个定时器的时候,我在这里边是不是可以注册多个定时器啊,那怎么区分它呢。
14:15
是不是就是按照这个定时器的时间戳来区分啊,啊,所以就是你设闹钟嘛,你可以同时设好几个,那区分就是到底几点闹对吧?哎,这就是这个过程。这就是关于我们这个timer service的这样的一些用法啊。啊,那当然这个还没完,就是这里边我们只是说了怎么样去注册一个闹钟,然后怎么样取消一个闹钟,那关键其实是在于。其实在于闹钟响对吧,那闹钟响的时候我们干什么事,在哪里去定义呢?哎,那是在另外一个方法里面,大家看。这个方法是叫做on timer,对吧?这个on time可以去重写。这里边我可以获取到on里边的参数啊,可以获取到当前的时间戳,那这个时间戳是啥呀。
15:03
大家想一下,这个时间戳是啥?这是不是就是当前的触发的这个时间戳啊啊对吧,所以我这里边可以直接打印一下啊,当前的这个时间戳啊,比方说我加上一个一个,呃,这个对应的这个描述啊,定时器触发啊,大家就可以看到当前这个时间戳是什么样子了啊啊然后这里边这个如果说你你还想看到一个结果的话,那我这里边你不要注册这个1万毫秒对吧,比方说我这里边加一个什么呢?比方说我就直接加加1000,基于什么加1000呢?我是不是可以基于当前的时间去加1000啊对吧,我直接用当前的这个current processing time,然后再去加1000,这个是不是就相当于是就完全基于当前时间,过一秒钟之后去触发这个定时器啊,对吧?啊,那当然这里边你如果要是没有1万毫秒的这个定时器的话,你删就没什么意义,对吧?啊这个大家知道啊,呃,然后接下来呃,这里边定时器触发的时候,你还可以做一些其他操作啊。
16:07
啊,就是想想做什么测试都是可以做到的,这其实就是这样的一个处理的流程,这就是关于我们这个kid process function里边可以做到的一些事情啊,那有时候大家可能会想到,那假如说啊,就这里边这个我delete要删除这个定时器的时候,我这里边可能不是这个直接写死的这个时间点,对吧?那有可能就是当时我基于当时的那个时间,然后比方说加了一个这个1000啊,或者说我这里边加了一个十十秒,对吧?诶那大家想我接下来要删除的时候,我该怎么删呢?那有同学可能说,那你就照着这个value.get time sample,然后加时再去再去删就完了嘛,大家想这两个调用的时候,应该是同一个数据的那个时间戳加时吗?这不是一个时间戳了,对吧?你想我当时注册这个定时器的时候,有可能比方说199来了一条数据,我注册了十秒之后的一个定时器,那等到我想删的时候,是不是有可能是等到209的时候来了之后,我想209当然这已经是十秒之后了,对吧?那有可能我是205来的时候,我就想删了,那是不是这个时候当前数据的那个时间戳已经不是当时我们注册的那个时间戳了呀,那大家想一下这个我我该怎么样去删呢?
17:23
这好像没法删了是吧?哎,对,所以大家想我是不是可以把它保存起来啊,然后我们当前如果要保存的话,这里面就涉及到一个问题,就是我当前的这个定时器啊。它是针对我当前K有效呢,还是说对于我当前的这一个当前这个类的那个实例,所有当前分区里边数据来了之后都可以访问呢?针对K有效对吧,因为我是K的process function嘛,当前上下文里面都有key对不对,所以当前的这个,呃,当当前我的这个定时器也只针对当前的sensor ID啊,对应的这个K是生效的,那我要保存是不是也要保存一个只对当前K有效的一个数据啊。
18:10
哎,所以是不是要保存一个kid state啊,那大家想我在这个kid process function里边可以去定义这个state吗?可以啊,对,大家想它本身是不是就是rich function啊,那rich function里面当然可以,是不是也有,也有这个运行时上下文open生命周期对吧?也可以获取那个get run Type Contact,然后是不是可以get state去注册这个注册状态啊,对吧,你有一个value state script,当然了,这里我可以把它指定成一个。在外面我是先把它声明出来啊,Value state,那大家想这个我要保存这个定时器时间戳的话,是不是就是一个长整型啊,对吧,我直接把它叫做呃ts timer对吧,创建出来下边这里边直接可以做一个赋值ts timer。
19:04
啊,比方说我这个叫做state吧,加一个state名称对吧,然后这里边给一个这个state,获取这个状态句柄的时候,直接就是这样去给它,就应该是一个长整型对吧,里边我这个直接叫TSTEM。后边就是长整形的。Class。大家看是不是就是就是直接这么做就可以了,然后我可以就是在之前那个数据来的时候注册了之后,是不是我就可以把它当前的这个时间戳做一个保存啊。这里边我是不是直接update,然后用当前的这个时间戳做一个保存对吧。然后当然这里边如果我要想去做这个清空的话,那是不是直接就做TSR点。Value直接把这个拿出来去做一个清空啊,啊,对吧,当然就是这样的一个操作了,是吧?啊啊,那所以呃,这这里边我们想测一下这个处理时间啊,所以这里边不要直接这么去清空,大家知道这个意思就可以了啊啊那所以在这个process function里面大家都看到了,首先它有这个reach function式复函数的所有的功能,我们可以定义状态,定义状态对吧,获取到运行上下文啊,可以去有这个生命周期方法啊,可以做这个open close的一些操作啊,那后边我们这个close的时候是不是还可以做那个清理工作啊,对吧,直接把这个ts the做一个这个clear对吧,这都可以去做,那另外呢,它还有一些rich function根本做不到的事情。
20:38
那就是这里边有上下文,我可以在这儿直接拿到当前的时间,戳当前的key,可以注册用这个timer service啊,用这个定时器服务去获取当前的处理时间,Auto mark注册定时服务对吧?On timer去定义一些触发,然后这个on timer里面大家看这个触发的时候还有一个上下文对吧?Timer里面也还有这个上下文,那这个上下文大家看它是不是还可以get当前的当前的current key对吧?还可以把当前的这个key获取到,那另外还可以是不是output,是不是还可以做特殊入流啊,大家看这个是不是还是测殊物流。
21:20
哎,那另外如果是主流呢,主流怎么输出out time里边能不能输出到主流呢?可以啊,Out嘛,对吧,out.collect这是不是跟上面那个process element一样啊啊所以这个是基本上功能也类似啊,那当然它就没有那个时间服务了,只是这里边还有一个time domama对吧,就是时间域啊,那这个域里边大家看能做的这个事情相对来讲就就会稍微的少一点,对吧?啊就是这里边你获取到的这个这个他time domain里边,它其实主要就是这这是一个这个枚举类型对吧,主要就是你看它当前到底是基于这个事件时间还是基于这个处理时间,因为大家知道你既然已经是那个闹钟已经已经响了嘛,已经触发操作了,你这里是不是就不要再调那个定时服务,再去注册闹钟了呀,对吧,这就不需要了啊,所以你只是能判断当前到底是事件时间还是处理时间,那至于当前的时间到底是什么,那是不是这儿时间说就告诉我们了啊,所以这个其实都有相关的性。
22:21
器啊呃,所以接下来我们可以运行这个代码给大家稍微的测试一下,呃,这里边我们把这个删除的定时器的这个助教了,对吧,那现在我们应该是不是要注册两,呃,我我把这个事件时间呢也注掉啊,因为当前我的是当前时间语义应该是那个处理时间对吧?啊,所以我就直接注册这样一个定时器就可以了,然后运行一下。当前我们还是那个流逝的数据输入对吧,所以接下来还是在NC里边把它做一个测试。好,这边代码已经启动起来。啊,我先直接我就来一个212对吧,大家看我这里边直接来一个212的这个数据,然后大家看,诶,这里边是不是直接就有一个定时器触发呀,然后这里边给定的这一个当当前的这一个时间戳,是不是就应该是我们注册好的一秒之后触发的那个时间啊,然后这里边输出的这个八,这又是什么呢?这是我们统计,大家还记得我们当时输出的那个结果吧,这时候我们在process element里边第一句out.collect输出它当前的长度啊,因为我们现在341嘛,大家看是不是长度是八对吧?那这里面的这个输出它是因为流里边是不是我们最后打印了。
23:39
啊,所以这里面有一个八的这个结果啊,而后边的这个定时器触发它是不是在。在这里边指定的呀。这个s outt里边对吧,我们定义当前的这个定时器什么时候出发好,那如果刚才大家看的还是不是很明显的话,我现在再再给大家输入一条。就随便给一个啊30对吧。
24:01
三四十大家看它是九,现在是不是也是马上就定时器出发呀,因为这个就是一秒,我们当时设置的那个是一秒钟对吧。还还记得我们那个时间是多少吗?一秒钟对吧,所以这个是不是很快就出发了,那比方说这里面我设置的这个时间稍微长一点啊,我重新运行一下,比方说给一个5000,那大家知道这这就是五五秒嘛,等五秒钟之后再出发,好,那所以这里我来一个啊十。哦,这个稍微有点慢,因为这个大家看还没有数据输出对吧,等五秒大家看现在没有对吧。接下来我们往后等,诶,大家看是不是五秒钟之后来了一个定时机触发呀,啊,所以这就是这样的一个过程啊,我们可以注册这样的一个定时事件,等待它发生。这就是process function process function的基本用法。
我来说两句