00:00
接下来呢,我们要给大家再梳理一下流处理和关系代数的区别,其实主要是要给大家引入在流处理的过程当中去调用table API和CQ,它里边需要注意的一些特点啊,引入一些特殊的概念,首先我们来做一个对比,用一张表来做一个对比啊,大家看这里边列了两列,就是一个是关系代数或者是CQ,大家知道什么叫关系代数呢?其实就是说的我们之前像这个MYSQL里边,我们创建的这个表就都叫做关系型表,对吧?而就其实我们主要就是设置的这个实体关系的这样一个关对应的呃,一个一个数据的组组合形式嘛,所以CQ里边我们其实就是利用关系代数去在一张表,二维的这个表结构里边去做了一个数据的查询和提取,那与之对应的呢,就是理由处理,就是来一个处理一个,其实大家一看这样的一个对比我们就知道了。
01:00
传统意义上的CQ或者说表的处理,其实是一个什么处理啊?其实这是经典的批处理对不对?就是数据都已经在这张表里边了,然后我们执行一条CQ,是不是可以针对这个有有限的数据得到一个最后的结果啊,这是一个典型的典型的批处理嘛啊,所以接下来我们对比的时候大家会发现啊,就是CQ这一部分明显是跟P处理天生结合更加紧密,所以之前我们在学这个Spark的时候,大家会觉得have对吧,用这个用这个CQ是特别舒服的一件事情啊,但为什么现在大家会觉得在这个流处理里边感觉有很多奇奇怪怪特别诡异的地方呢?主要就是因为还是底层批处理跟流处理的区别。所以大家来对比一下啊,就是关系代数CQ,平常我们的CQ里边处理的数据对象是什么呢。其实我们表里面的数据是有界的,对吧?诶,数据都已经到那儿了,然后我们执行一条CQ,然后跑出来,所以它是字段元组的有界集合,就是一行数据有很多个字段嘛,我们认为它是一个丝段元组,这个没问题,对吧?他就把它当成一个肉啊,一行当成一个元组,它是有界,而流处理,处理的是一个无界的序列,对吧?啊,这是最基本的区别,然后是query,我们做查询的时候,你要访问数据的话,怎么访问呢?
02:24
那CQ我们写CQ去查询的时候,是不是相当于数据都已经在这儿了,我们直接可以访问到完整的数据,输入访问一遍,就直接可以把它最后的结果要生成啊,这就又涉及到这个查询的终止条件,是不是只要把所有的数据都遍历一遍,最后得到结果,生成固定大小的一个结果集,是不是就结束了呀,就退出了,对吧?哎,这就是这个一般我们传统做CQ大家的这个整个操作的流程,那现在流处理的时候。你假如想做查询的话,可以直接访问到完整的数据吗?访问不到对吧?所以这里边我们其实是必须要持续的等待流失输入,就像我们一开始说的那样,流逝的处理程序是Env.execu的,执行起来之后等待事件触发,来一个数据处理一次,所以这里边如果我们流处理里边要做这样的一个用SQL的查询的话,那大家想是不是也是我必须。
03:22
不停的等新的数据进来啊。诶,那这里面就有一个问题,我这里边永远永远都有新的数据进来,那是不是永远都不会终止啊。所以在流处理里边,如果做CQL查询,其实是一个永远不会停止的查询,那我们这里边做的操作其实是每一个数据来了之后,是不是都要基于当前的所有数据要做一次查询得到一个结果啊,所以大家看到其实它是根据新来的数据要持续不断的更新这个查询的结果集,对不对?所以我们得到的那张表是不停的在变的啊,所以大家看到在当前我们做流处理的时候,跟之前做离线处理啊,大家比较习惯的这种表处理其实是有所不同的,这里边就有一个核心的概念叫做动态表。
04:14
Dynamic table啊,所以之前我们讲的那个,大家可以理解成是静态表的,表里面的数据我们在至少我们在做查询的这一瞬间,那表里面的数据是肯定是静止不动的,对吧?诶都是现成的,都已经到齐了,我想选取多少数据,选取多少数据,而对于当前flink里边我们做流数流数据处理呢,那就必须把这个表看成是一个动态的,随时都在变化的这样的一个状态,所以这其实是flink对于这个table API和CQ支持的一个核心的概念,大家理解了这个概念,就知道到底它底层是怎么样用table啊,用CQ的这种方式来去做流处理了。啊,那这里面它主要特点就是随时间变化,那它动态表我们本来输入的数据在不停的变,那么经过查询之后得到结果也在不停的改,对吧?所以这里边我们对于动态表的查询就不是一次查完就搞定了,而是要不停的查,对吧,随着输入数据的变化,呃,一个一个来我们这里边要持续查询,所以这里面就得到一个新的概念。
05:23
就是针对动态表的查询,就会产生一个所谓的continuous query持续查询,持续查询是永远不会终止的,那大家想这样一个持续查询得到结果是一个静态的表吗?那当然是不是也是一个动态表啊?所以大家就会发现了,本来我们这个数据流逝的数据嘛,源源不断的来,那输入的这张表是不是就可以认为是一个动态表啊?不停的有数据进来,对吧,然后他经过这个table API的查询,或者说写CQ做查询之后,这是一个持续查询,那是不是他得到的结果也是一个动态表,那么结果得到的这个结果动态表是不是它就可以反映我们每一次更新,就可以反映当前新输入一条数据带来的影响啊,哎,所以他们就是这样的一个一一对应的关系,这就是把这个表的查询跟流失处理结合在一起了。
06:20
那接下来我们再看一下这张图,给大家说一下流式表查询的一个处理流程啊啊,那怎么样做这样的一个流式表查询的,简单来讲就是这么三步,就首先把我们本来是流嘛,流是不是先转换成一个输入的,类似于这样的一个动态表啊啊这里面有一张表之后接下来我写一个CQ,那就是定义了一个动态表到动态表的转换。这个转换,这个查询其实是一个持续查询的过程,对吧,就每来一个数据,大家想我这个动态表是不是要要变化一下,然后接下来这个持续查询是不是要执行一次,每一次都执行,然后得到是不是就是这个又是一个动态表,这个动态表结果动态表就要更新一次,所以这就是这样的一个查询过程。
07:08
然后最后这个动态表结果的更新,是不是还要再把它转换回成流啊,这才是我们最后的这个流失处理的过程,因为我们最后要的还是来一个就有一个输出结果吧,还得是一个流失输出啊。这就是完整的流程,有点像之前我们第一个这个事例里边,我们先读成一个流,然后转换成表,然后做那个操作,对吧?但这个说的并不是我们从这个代码里边直接把它读成流,然后做的转换,这说的是即使是你一上来之后把它读成了表。那大家想它底层是不是也要有这样的一个转换过程啊,底层本身来的还是流对吧,那还是底层的流要有这个转换成动态表的这个过程,好,那接下来我们就看一下一个具体的例子吧,当前这个具体的例子大家看一下,这是数据啊。
08:00
这是数据,这个大家看一下数据有三个字段逗号分割的啊,前面Mary Bob这个看起来像一个用户名对吧,然后后边呢啊,12:12点零五,这明显是一个时间嘛啊类,所以这明显这是一个日志数据了啊,类似于我们的一个买点日志,后面呢还有一个。呃,还有一个就像一个URL一样的东西,对吧?所以大家想这应该是一个什么数据啊,是不是用户的一个访问数据啊,对吧?对于网站的一个访问数据,或者说你可以认为收集日志收集的就是用户的点击行为或者说访问行为,对吧?他点一次鼠标是不是就会访问一个链接啊,诶所以这个其实就是最常见的一种用户行为数据的收集啊,那大家看一下,现在我们来了这样的一个流失数据,假如说我们基于他,呃把这个就是当前啊,是卡夫卡进来的对吧,我们把这个这个卡卡进来的数据。注册成了一张表,那大家想在flink内部这张表应该长什么样子呢?
09:04
那大家看是不是就是这样这样的一张表啊,我把这三个字段包装成一个自己想要的一个po类啊,可以啊,或者说我这里边不用不用包装,因为我直接连接卡普卡,是不是可以直接指定sche码呀,对吧?我的sche码指定出来三个字段,User当前的用户名C,呃,C time,这就相当于点击时间对吧,Click time,另外还有一个URL点击的那个链接,然后大家看,那就是每一每一条数据来了之后,是不是这张表里面就加一行啊,所以大家会想到第一步流转换成动态表的过程,这是一个什么样的表?哎,如果从这个表的操作上来讲的话,这是不是就是我们之前说的只有追加插入信息的那种表啊,所谓的那个pad only的这种这种行为对吧?哎,所以大家看到这就是来一个数据后面追加,来一个数据后面追加对吧,即使是你来了相同的Mary,这是不是也应该叠加在后边啊,这是我们的原始数据嘛,我并不知道你现在要干什么呀,哎,这就是我们当前生成的这个动态表。
10:08
然后接下来我们可以写一个Q,大家看一下这个CQ是怎么写的,Select user,然后count URL,对吧?As c,这要统计什么呀?From clicks,这是把这张表直接叫成CS了啊,注册成我们在这个,呃,比方说卡夫卡读进来之后,在环境里边注册叫clicks,然后接下来group by user对吧,那这其实就是。对,按照用户分组统计他的点击次数,这是不是就类似于就是对于这个用户行为要做一个统计啊,这个用户他到底他要点多少次对吧?诶当当前对于我们这个网站到底感兴趣程度有多少,所以他是要统计这个com的数量。那来想一下,基于之前我们的得到的这一张初始的动态表,如果要是应用这样一条CQ的话,这是相当于一个持续查询,对吧?哎,那接下来得到的表结果表应该是什么样子呢?
11:09
那也是一个动态表,持续变化的一张动态表,那我们一条数据一条数据来分析,首先一开始来了一个marry,然后一个这个home啊,前面那个还有一个那个时间戳啊,时间那个我们就直接大家看就直接把它去掉了,对吧?因为select,我这里面是不是要的只有这两个字段啊,对吧,只要这两个字段啊,那么我这里边有了一条数据来了之后,是不是首先在结果表里边user和CT会有一条数据插入MARY1MARY是不是有一次点击啊。然后接下来继续来了一个Bob的点击,这样一个访问URL的数据,那么接下来是不是MARY1BOB1追加了一条数据对吧,接下来又点了一次,这怎么办呢?这是不是表里边应该是Mary更新成二啊,这是一个更新操作,所以大家看。
12:04
其实大家知道为什么有更新操作啊,对,这里面是不是有聚合啊,大家想到如果是简单转换map filter的话,那当然不会有这个更新操作,但如果说出现了聚合的话,那当然这个count值随着变化,我们这里面count值肯定要改变嘛。所以你看到这张表里边就会出现了增加和修改的操作,后面如果再来一个例子啊,那么在后面再追加例子一,这就是我们得到的结果的这个动态表。最后一步得到的结果动态表是不是还要再重新转换回成流啊,那大家就想到了,那当前我得到这这个表,这怎么能直接就转化成硫呢?哎,其实大家就想到了,那它要转换成硫,是不是相当于就是要把我当前这张表的那个变化的那个信息都要提取出来,写到流里边去啊,大家想想是不是就是这样,所以说这里边的变化信息,那你说表的变化,我们说无非就是这个增删改嘛,对吧,增删改查查询不涉及到表本身的变化,所以说就无非就是增删改嘛,所以接下来我们就想到了,对于一张表做了增删改之后,它这个发出去的。
13:22
变转变成流里边信息的这种方式不同,那是不是就代表着得到不同的这个最后结果的这个流就不一样啊,啊,所以大家会看到这是不是就是我们之前所说的更新模式的问题啊,其实就是在最后一步,我们要把这个动态表转换成一张一个一个流的时候,考察它到底要把这个表的变化。编码翻译成什么样的流数据对吧?诶,所以其实就是这样一个信息的编码过程,那所以我们就知道了,这里边是不是相当于就有三种不同的更新模式,也就有三种不同的转换成流的方式啊啊所以大家看到最简单的一种流,那当然就是颈椎加流啊里这样的一个流,这样的一个流的话,那其实是不是就相当于我只允许发送插入的信息来修改我的这张表啊。
14:16
对吧?这张表里边就只有插入的操作,那么这种情况下我就可以把它转换成一个颈椎加流,那对于外部系统而言,它的意义就在于我不需要有任何的更改操作,对吧?我只要能写入,能够往后面追,我就能把这个这个流做一个处理,所以大家看转换成最后一步,转换成流它的目的是什么?就是要连接外部系统写入对吧?所以你看那个外部系统就看你支持哪种模式嘛,哪种更支持哪种更新模式,就能够接收哪种流,对吧?啊,那这个颈椎加流当然是它只能是往后追加,那如果出现更新操作,Update操作搞不定了,搞不定怎么办呢?哎,撤回流对吧?撤回流里边它包含两类信息,那就是如果来了一个只是来了一个这个insert啊,一个插入操作的话,那我是不是就是一个添加呀,如果只来了一个删除操作,我就是一个撤回retra,那如果要是update更改。
15:16
对,先撤回之前的那一条,然后再呃,就是插入新的这一条,对吧,两条信息实现我们这样一个功能,那最后还有就是UPS了,UPS是不是就是不管是插入还是更新都是一个UPS消息,那如果删除的话,Delete对吧?啊,所以这个整体来讲就是还是非常的呃,容易想到的啊,那大家想一下,当前我们的这张表的话,要转换成流,它应该是个什么流。哦,大家都想到了,具体到代码里边的话,它转换成流的这种发送消息的这种模式,是不是要跟就是外部系统支持的那种模式相关啊,对吧?你外部系统它定义成我当前是什么样的模式,这个时候它就选取对应的模式,但是有些场景它会有限制,比方说我们当前做了聚合了,有更新操作,那你外部系统如果来一个,它只能接收颈椎加流啊判断的话,这可以吗?
16:16
当然就不行了,对吧,所以之前我们测试就报错了嘛,你要往文件里面写,它只接收判里,那那就不行了,那如果是往卡夫卡写也是判里,对吧,那也是不行的,所以说我们当前的这张表可以把它转换成什么样的流呢?是不是retra流和upset流都可以啊,这两个都可以,我们看一下这个效果是什么样的,好,那大家看,如果转换成一个retract流的话,这里边其实得到的结果就是。每一个数据Mary大家看来了之后持续查询对吧?生成的那张表不是应该是MARY1吗?所以它发出去的那个消息转换成流之后,是不是就是这里边的这个加一个MARY1加就表示insert对吧?就是插入的消息就是加MARY1,然后Bob数据来了之后是不是又是追加呀?哎,那就加报不一,然后后边又来一个marry的时候,这怎么办呢?
17:09
Retra模式,对,大家看这条数据引发的操作是有两条流失输出是减掉MARY1,再加上MARY2,是不是撤回之前的,再追加更新之后的呀,好,然后后面的话来了例子的话,那么就是例子再追加对吧?加例子一,如果再来一个Bob,那又是减BOB1加BOB2,又是两条数据对吧?所以它的这种方式,它的好处在于相当于比较简单对吧,我就是通过发送两条消息,你外部系统只要你能读得懂我这个消息说的是啥,那你就可以知道啊,我要去更改什么东西了,那那它的缺点就在于,首先外部系统也得能够读出这个数据来,能做更改才行,对吧。你比方说你写入到文件里边,他能我之前文件已经写了好几行了,然后你来一个带减号的这个信息,我就能找到之前的那那一行,然后给他做更改吗。
18:07
一般的文件系统好像没这种写入的时候没有这种操作是吧?啊,所以这个就它就不支持嘛啊,所以他对于外部系统还是有一些要求的啊,另外还有一个很很重要的特点,就是它是不是这个编码上有点冗余啊,来一条数据是不是有可能会对发两条数据才能表示一个更新操作,所以这个就相对来讲比较麻烦嘛,那当然更好的方式就是upset了。下面这就是一个upset流,大家看转换的就是大家看两种方式,星号表示upset by k必须得指定K对不对啊,然后这个减号表示delete,大家看这里边所有的每一个数据来了之后,是不是都是一个星号,都是一个afternoon的这样的一个信息啊,对吧?你是可以marry一开始来了之后,这是追加对吧?Bob来了是追加,后面这个呢,是一个更新嘛,MARY2我只要以当前的username作为keep就完了吗?诶那大家想这个对于外部系统来讲,它是不是就必须得有这个要求啊,外部系统是不是必须得知道哦,你当前的K是这样,我保存的时候,我可以根据这个K找到当时的那个数据。
19:14
单想ES是不是可以做到,呃,Red mysq是不是都能做到啊,诶,所以像这样的一些数据库,它就可以支持这样的UPS模式啊,这就是动态表,呃和这个转换成流啊,就通过流转换成动态表,然后再由动态表转换成流,持续查询整个的一个处理流程。
我来说两句