00:00
我们上节课给大家讲到这个water mark的时候,我们曾经讲了a signer,对吧?呃,在这里我们要传一个time step a ser这个东西来去实现提取时间戳和抽取water mark的这样的一个生成water的这样的一个机制,对吧?这一部分内容当时我们有一部分没讲,就是不是有两种那个类型吗?一个周期性的生成,呃,Watermark,一个是相当于是那种可以打断式的对吧?断点式的生成那个water mark,那这里边文档里面有两个例子啊,我在这里直接给大家来看一看吧。呃,我们这个就不不带着大家详细去写了啊,大家看一眼知道它是什么意思就可以了,比方说这里边大家看第一个例子,这是一个aign aign with periodical waters周期性生成。所以大家看下边它主要去。这个需要的这个东西是什么呢?需要去实现的方法就是有一个是get current water mark对吧,还有一个是extract time stamp,这个extract time STEM大家是不是觉得很很熟悉啊,这个是不是在之前我们讲那个乱序数据的时候,那个一长串那个那个方法里边,对吧?它是不是就要实现一个这个方法啊,所以它其实就是抽取时间戳的方法,对吧。
01:28
大家想是不是这样,然后同样上边从这个名字上来看,Get current water mark,它最后会返回一个water,是不是这就是一个生成water mark的方法啊,哎,所以大家看这个所谓的周期性生成water mark,它是什么呢?其实就是。默认不是200毫秒吗?200毫秒就去调用一次这个方法。大家看啊,其实就是这些东西,然后大家看这里边的这个实线是什么呢?这里边的实线是一开始定义了一个棒的,就是整体的延迟对吧,延迟是一分钟60乘一千六千六六万对吧,一分钟啊,然后大家看这个还有一个什么呢?是当前最大的时间戳,大家想这个最当前最大的时间戳是用来干什么的。
02:22
大家会想到我是不是应该保证water mark要只涨不跌啊,所以我是不是每次都要比对当前已经存在的最大的那个时间戳,然后在这个基础上去延迟一段时间,是不是就实现了我们那个延迟发车的automark机制啊啊,所以大家看这个实现就很简单啊,就是为了保证只涨不跌,所以我们要一直对这个数据里边所有数据取最大的那个时间戳,那么大家看生成时间戳的这个抽取时间戳的方法,是不是还是直接从这个数据里边把那个time STEM拿出来就完事了呀,这个直接拿就完事了,然后呢,他还多加了一部。
03:07
是不是就是要比较一下当前的时间戳和之前最大的那个时间戳比一下,对不对,总是更新成最大的时间戳,把它保存下来,然后这个最大时间戳用什么呢?诶大家看生成water rock的时候怎么生成呢?就用当前的最大时间戳减去一个延时。啊,所以大家看就是说如果我当前的water mark已经进行到,比方说时间已经到了十点钟了,现在最大的时间,最大的那个时间戳是不是就是十点来的数据里面最大时间戳是十点,对吧?那现在water是什么呢?那是不是就是十点再减去一分钟。水位是不是涨到了09:59啊,所以就是09:59分之前,之前的数据都到了,哎,这一班车该发了,所以大家想到是这样的一个状态啊好,这是周期性生成,另外还有一个这个乱距,呃,就是可以不是周期性的,对吧,间隔性的,间断性的去生成这个,呃,这样的一个代码,大家看一下这个a sign with puctuated water marks,这里边同样还有一个实现。
04:21
这个实现的代码其实也很简单,大家看这里边,呃,这是上面这个类的定义对吧,然后上来是不是还有一个延迟啊,还定义了一个棒的一个延迟,然后下面大家看他是干了一个什么事情呢。他直接,呃,首先还有一个提取时间戳,这个它就没保存什么最大时间戳了,对吧,直接把数据里边时间戳提取出来完事,然后另外它就不是那个直接get water mark了,它是什么呢?它叫check and get。所以在这个里边,大家看它可以干什么呢?对,可以去根据对,停下来去检查一下,根据当前的这个数据的一些特性,检查一下是否符合某些条件,我们可以让它符合某种条件的时候,再让他去生成一个water mark。
05:13
啊,所以大家就会想到它的这个触发就不是周期性的,一会儿就系统自己就生成一个,它是什么呢?这是不是应该是数据触发的呀,来一个数据之后,我们判断一下是不是符合这个标准,然后就生成一个watermark对不对,大家看,比方说这里边假如这个数据是ID是341的话,是一号传感器的话,大家看是不是就拗了一个watermark啊,而且这里面watermark还有一个延迟固定的对吧,就用当前的这个数据的,呃,这个这个time step,然后减去。那个延迟定义好的延迟,这就是我们的逻辑对吧,如果来的是别的别的数据的话,别的传感器数据的话,直接就不生成water rock,这个water总跟在341的数据后面,所以大家看就就是这样的一个,其实实现起来也很简单啊啊在实际应用场景里边,那就根据自己的需求来了,好呃,当然了,大家看到这个,接下来就是去看这个滚动窗口,滑动窗口的一些实实现对吧?好,那我们这个就是直接在代码里边来。
06:23
重新建一个这个。Object啊,直接放在这个API test下边,我们这个叫window test吧,都是跟window相关对吧?Water最后我们也是要做window操作嘛,看看它的影响,好,这叫window test,呃,然后这里边我们常规的东西该有还是有对吧?呃,这里边我们那个STEM大家还记得吗?可能好久没写了,是不是stream execution environment,然后get对吧?啊对这个常规的这个做法啊,然后接下来那个我们可以全局的把它并行都设成一,不影响正确性对吧?哎,这个是没问题的,然后接下来大家还记得我们那个S是怎么读取来着,呃,比方说这里边我们先从文件读取吧,这个最简单对吧,我直接把这个先copy过来啊。
07:25
这个我就叫啊,就就叫吧。然后接下来我们定义一个data对吧,Data stream,那是要对stream是不是先做一些预处理操作啊,因为那个读进来是那个呃,字符串对吧,都是string类型,所以我们还得把它包一下这个,这个我就不详细写了啊这个。在在哪里,我们有过那个来着,应该随便找一个都有吧,我们的那个数据源哦,这个对吧,我们直接把这个map操作直接copy下来。
08:06
然后包装成一个sensor reading对吧?啊,这部分大家都知道就可以,我我就直接把这个跳过了。Map,好,呃,现在我们就得到了一个sensor reading里边的数据类型是sensor reading的一个data stream,对吧?好,那接下来大家做什么操作?大家想一下。接下来做什么操作呢?啊,当然我这里可以定义一个啊,比方说就是我现在比方说我是想要做什么事情呢?我想要呃,就是呃,让每个传感器的那个温度,比方说啊,呃,我要设一个滑窗对不对?呃,比方说设一个先设一个滚动窗口也可以啊,我先设一个滚动窗口,然后呢,滚动窗口聚合操作,聚合什么吧,我们简单一点,比方说输出15秒之内的最小的那个温度值吧,这个是不是比较简单的一个操作啊啊,我们就以这个作为一个例子啊,比方说m temp,最小的温度per window。
09:12
呃,这个stream对吧。那就在data stream的基础上,大家会想到我们,呃,这里先做一个操作啊,我可以既然只是提温度,那我先把所有的这个data是不是转成就是一个二元组,只要ID和温度就够了,时间戳都不用啊,我现在好,然后r.ID呃,这个data.id。Data点呃,Temperature对吧,我先把这个先搞定,然后接下来。接下来是不是K,如果要开窗,我们之前说对吧,你要不就得WINDOW2了,大家还记得我们讲的window API吗?呃,所有的这个window操作都是建立在一个k stream基础上的,对吧?啊,要不然的话就用WINDOW2操作KBYKBY哪个呢?
10:09
当然是ID对吧,ID这里边你可以直接零也可以是呃,也可以是什么呢?对下划线,因为上面是圆阻了,不能用字段了,所以说下划线这个一对吧,然后接下来是不是可以开窗口了,大家看可以可以window了,对吧?我们这里边比方说先开一个时间窗口吧,开一个时间窗口比较简单,是不是直接可以time window啊time window,然后这里边我们给一个15秒time。呃,这个得得引入啊,15秒。那15秒太长了是吗?这里大家注意一下,我们要引入什么呢?Flink streaming API window这个time.time对吧?啊,这window window相关的这个东西,Seconds,那那我十秒吧,大家觉得15秒太长了。
11:01
好,然后接下来。接下来我是不是可以做操作了,对吧,这里边是开。时间窗口,然后接下来是不是就可以做聚合操作了啊,这里边啊,我不要直接那个ma,呃,就是max命了,我我用一个reduce吧,给大家讲的那个。就是增量聚合函数对吧,增量增量聚合的方法好,那么reduce的时候,这是不是就相当于我的DATA1DATA2,大家还记得reduce怎么写对吧?两个数据来了之后,前一个就是我们之前聚合的结果对不对?后一个就是对新来的那个最新的一条数据对吧?好,那所以我们要输出的是一个什么什么状态呢?对,其实就是要比方说我们最后要的也是一个ID和一个最小的那个,呃温度对不对,那I应该是什么?ID是DATA11对吧?啊,其实DATA1跟DATA2KY之后ID肯定一样对吧?啊,这个不用不用考虑啊,然后后面那个呢,最小的那个温度呢,那是不是就是两个取最小的呀,它的二啊,大家可以直接给一个M操作对吧?然后贝塔二点下划线二,这样是不是就取了一个最小值啊对吧,用reduce做增量聚合。
12:40
好,呃,所以大家看到这其实就是我们很简单的这样的一个操作,对吧?呃,在这个过程当中,这个如果做完之后,大家应该想到reduce完了之后得到是是个什么东西,就是我们这里面的这个什么mean time per window stream,这得到的是什么东西啊。
13:03
统计15秒内的最小温度,好啊,当然这里我们是十秒了是吧,好。大家想这个reduce完了之后得到是什么?看一眼啊,是不是又变成了一个data stream呀,对吧,又回来了对吧?哎,这这个大家一定要就是注意一下,它这里边就绕来绕去,这个这个东西啊,这两步是我们所说的window操作的那个算算子对不对?这两步前面是一个a signer,那个window sign,下面是一个我们的那个window function对不对?前面定义数据去哪个窗口,下面定义窗口关闭的时候做什么操作对吧?做什么计算,这两个是在一起的,他俩做完之后得到的就是data stream,那大家可以看到一开始是data stream map,之后还是data stream,对吧,K之后得到的是一个k stream,对,然后time window之后得到的是一个。
14:08
一个window的stream对吧,这个中间是有这样一步,然后接下来在reduce之后又回到了一个data stream,这就是跟大家说这个数据结构的这个转换啊,呃,当然了,这个time.time window,这是我们这个比较简单的这种实现方式,这个点进去的话,大家其实可以看到啊。大家可以看到上面其实有一个有一个说有有一个这明确的说法的,对吧,这是一个shortcut。Shortcut就是简写缩写对吧?For for什么呢?点window,哎,所以大家看它底层是不是还是点window啊,对吧,那大家看点window里边是不是就得传一个所谓的那个啊,哎,我们的那个sign啊,Window sign对不对?那大家看这个sign是怎么定义的,它这里边是一个tumbling even time window,或者是tumbling processing time window,对吧?然后后面点of size,这个看起来就比较复杂对不对。
15:10
我们现在这个操作是不是就特别简单啊,但是大家要知道,其实底层还是点window啊,大家不要认为点time window就是一个,就是一个特殊的一个一个算子了,或者是一个运算符了,还是window啊,好,呃,大家看一下知道就可以了,然后接下来呢,既然是data stream了,我们是不是可以把它打印输出了呀,对吧?所以接下来我们就命print。我们把它打印输出,比方说这个叫mean time,对吧?呃,当然为了方便大家看的就是更加的全面的话,我们可以把这个data stream也打印输出,对不对,对吧,这个是input data好。啊,当然最后大家不要忘记还有对执行execute就是window test,好呃,那这个我们就可以直接运行一下了,对不对,好,我们看看这个效果怎么样啊。
16:14
大家看到这个已经运行完了,得到结果是什么呢?全是data stream那个input data打印出来了,对吧,说明所有的数据都读了一遍。但是怎么我们这个窗口聚合没聚合出来呢。大家想想这是为什么呢?时间,大家看一下我们开的窗是十秒钟对不对?大家觉得刚才运行运行了十秒钟吗?这里大家要注意,我们这里边开的这个窗口时间是什么时间,我们不是有不同的时间语义吗?这是什么时间,什么都没设对吧?大家还还还还能想起来吗?我们讲了时间语翼,这里面什么都没设,跟之前一样的这种处理方式对吧?什么都没设的状态下是什么时间语义processing time对吧?哎,那pro如果是processing time的话,那大家想一下,我们当前设的就是你要按照处理时间的十秒钟去统计一个窗口对吧?关闭一个窗口,我们整个这个运行有十秒钟吧。
17:34
这个几乎就是这他应该当时这个数据就是一瞬间就全跑完了,对不对,对吧,前面那个启动或许都都没有经历太多的这个,呃,太多的复杂的操作,对吧,那直接来了之后直接就过去了,那是不是相当于这个窗口里边就什么数据都没,就是窗口还没关闭,没到它那个时间呢,我们就已经跑完了对吧。哎,所以你如果要是这样这样的一个状态的话,那肯定就会有问题了,哎,所以这里边有同学其实前面提到了啊,就是你这里边是不是应该换一种方式呢,确实。
18:10
我们应该那个把这个stream还是老老实实的,我们不要卡夫卡拉吧,对吧。呃,我这里边就直接写死了啊local host。啊,我直接7777起一个socket,用流式的这种处理方式,然后大家会想到它是不是就不会直接跑完啊,那不会运行完的话,那他就可以等十秒钟了,对不对,那这个过程可能我们还是可以接受的,好吧,那我们就用这种方式来给大家试验一下吧,好,那这里我就只能是先去起一个NC对吧。好,我把这个放在这边,然后接下来我们把这个代码跑起来。
19:02
啊,这边已经跑起来了,那接下来我们的任务其实就是这个一条数据一条数据输输了对吧?呃,这个我为了方便我这个copy,我直接把它放分屏显示吧,好输一条数据,大家看这里边是不是有输出啊,诶大家看这里边直接就输出了一条命对不对。呃,那这里是不是只有三四十一,只有他一个啊,哎,所以说这里边大家看就是是什么,他就直接输出了对吧?好,然后继续看吧,三四十六。来input一个346对不对啊,大家看他这没输出东西啊,因为大家知道那个窗口是不是要要等十秒钟啊,对吧?哎,大家会看到,过了一会儿之后,我们是不是看到了三四十六的这个输出,对吧?哎,果然是按照我们这个等十秒钟,尽管我们没有详细的数到底是几秒钟啊,呃,大家看这里边是不是347的那个数据也出来了,是另外一个十秒钟对不对。
20:05
而且大家会发现它它这个输出有一个什么特点啊,是不是一定得是严格意义上隔十秒之后输出啊,因为我们定义的是什么窗口滚动,对我们设置的是滚动窗口,而且有同学可能说,诶你这会儿怎么就就没输出呢。对吧,我们等了好久了,这会儿怎么就不输出了呢。对,这这是因为我们现在是不是这个窗口里面就没有数据啊,哎,这里大家注意啊,没有数据的呃,这个窗口呢,它就不会去触发它的操作,既然没数据嘛啊,那你操作是不是也白操作,所以这个就是我们继续给这个341,我们快速的十秒之内给两条,看他这个大家大家预计这个应该是什么样子,下面输出应该是应该是要直接给31对不对,诶上面还有一条35。
21:05
那大家可能会想到这是是不是他刚好等到我们时间窗口那个十秒的那个截止那个点啊,因为大家滚动窗口是不是每次就卡的是十秒那个点,可能刚好我们输完,输完这个的时候,他卡到那个点了,所以就输出了一条对吧,所以大家会发现他是这个过程当中其实是会。它其实是会就是聚合这个结果的话,假如说卡在了某个时间窗口的那个位置的话,马上就会输出对不对啊,因为这里边我们这个就是它是processing time处理时间,所以这个大家有时候就不太清楚到底是什么时候输出的啊,这个就搞得不太清楚对吧?所以为了我们测试再再看的更加明确一点,对大家会想到我是不是可以把这个不要用处理时间了,我我们可以用什么对我们去设置type。
我来说两句