00:00
那前面我们已经知道怎么样连接到外部的文件系统去创建一张表,那相当于就是在整个流处理的三部曲里边,最先一开始的这个S是不是就搞定了啊,大家看到这个不光是我们搞定了S,其实还搞定了后边的一个基本转换对不对,相当于我们前面的那个map成po类型,是不是也也包含在这里边了啊啊,接下来就不用做别的操作了啊,就基于这个table API做就完了。啊,那接下来第二步当然就是可以调用table API或者写CQ去做查询了啊,所以接下来我们给大家看一看这个表的查询,首先是table API table API的话,我们说它是集成在这个语言里面的,呃,调用的这个方法啊,API方法啊,那所以呢,它就要基于一个代表表本身的一个类,对吧?啊,这个类当然在底层源码里边,Table本身是一个interface,是一个接口,那当然我们实现这样一个接口,就得到对应一个类,对吧?我们直接调用的过程当中,其实就是直接。
01:05
生成了它的一个对象啊,就直接就有这样一个对象的生成。所以。最终我们在调用table API的过程当中,都是基于一个table的对象,然后呢,返回一个就做一些转换操作,最后再返回一个新的table对象,对吧?啊啊,所以这里边大家就会看到这个关系型的转换操作啊,可以有这个多个操作,用这个点点点的链式调用的方式啊,一步一步跟在后边直接把它定义出来。接下来我们在代码里面做一个简单的实现。那前面这一部分我们就可以先把这个注掉了啊。接下来做,呃,刚才这个是第二步,接下来我们就可以做第三步了。做一个查询转换。呃,我们现在首先3.1,先做一个这个先做一个简单的查询转换啊,呃,我们首先是看那个table API。
02:13
简单转换,呃,所谓的简单转换其实就是基于这个表只做一些提取过滤,对吧,来做一些这个简单的分析判断啊,做一些统计的操作,所以这就跟我们前面讲的那个过程是一样的,我现在相当于不是已经有了这个table了吗?对吧,当前已经把这个table创建出来了啊,那接下来是不是直接基于它input table。然后直接可以做一个做一个,比方说select对吧,Select当前的字段前面我们也已经做做过了啊ID啊,Temperature提取后边是不是可以直接做一个where或者做一个filter啊,啊这个过程都是一样的啊,比方说我当前要这个,当前这个ID等于这个,前面不是等于341吗?啊,我现在如果要等于346也是一样的,对吧?哎,这这是完全没有问题的啊,后面给一个这个,给一个这个,呃。
03:13
就是引号要把这个完整的引起来,因为filter里边要传的这里边其实是一个就是当前字符串类型的一个predict,就是我们的一个一个一个语句,一个表达式,对吧,这样的一个,呃,当当当前我想要表达的这样的一个含义啊,一个语语语句符合CQ语法的一个语句,呃,那么这里边呢,有几种不同的写法,就是关于这里边这个等等于的这个判断,你像这个直接写一个等号的话,这就像我们CQ里面的那个写法一样,对吧。那另外呢,我还可以直接写成三个等号,这三个等号的写法其实就相当于是什么,大家看这里,我除了写这个,呃,对应的这个。一个string之外,我这还可以直接写expression对吧?Expression的话,当然这里边的版本,它这个推荐的这个写法是直接可以用这个scla里边的这个就是一个小标啊啊,这相当于是skyla里边的那个就是标标记的那种那种写法啊呃,我们这里边是Java版本,那当然就没有这种写法了啊,那那我们也可以直接就是把当前的这个东西直接提取出来就完事啊,你看这里面可以直接写一个这个expression。
04:27
这里面是不是直接给一个三等号啊,三等号就是在底层把它解析成一个符合table API的一个表达式,然后里边底层它判断三等号是不是就相当于是我们的那个等于的那个就是值等于的那个判断方式啊啊,所以这其实就是不同的写法啊,这个大家大概知道就行,呃,本身也并不是特别的重要。这就是关于这个简单转换,那我们可能会想到,呃,这个前面我们都已经做过了啊,就把这个可以在挖出来这个得到一个result string,呃,Table,那我们肯定想到的是我我除了这个简单转换之外,可能还想要做一些更加复杂的操作啊,你像之前我们是不是至少可以对它做一个聚合呀,比方说我统计一下当前这个,比方说当前这个ID有多少个,对吧。
05:22
或者我统计一下当前这个温度平均值是多少,能不能做呢?那这当然是可以做的,所以我接下来可以做一个聚合统计。啊,那比方说现在我就基于之前的这个input table啊,还是现在要做聚合统计该怎么办?那家想聚合是不是必须首先应该得指定一个对当前group by的那个字段,那个K啊,就像之前我们在那个data stream里面去做KY一样,现在是不是也必须要做一个?Group by啊,对吧?所以这里边诶大家看这个这里边没有KBY对吧?我们这里边就直接就是group by,因为是CQ操作嘛,它还是跟那个CQ的用法是一样的啊,这里边我们就把想要分组的字段直接定义出来ID,然后接下来是不是就可以直接做聚合了,但是这里边如果做聚合的话,大家想这个aggregate的话,这是不是相当于我得自定义那个聚合操作啊,或者flat aggre对吧,直接做自义操作,那我们这里面既然是用tablef link CL,我当然不想那么麻烦呀,我们自然是想到你能不能直接给我sum对吧,或者count直接做出来就完事了嘛,所以这里边我可以直接用现成的CQ里边提供好的方法,比如说我直接select,这里边select什么呢?
06:41
哎,直接比方说ID,当前的这个ID提出来对吧,然后我要当前的这个count数量,直接就id.count就是它的一个方法对吧,我就直接这么写,这就相当于把ID的这个值全部count出来了啊,那我可以给他一个别名,比方说这个叫做as count对吧?啊,那另外我还可以给一个temperature。
07:05
点A,大家知道这就是平均值对吧?然后as average temp,这都可以直接做一个提取啊,那当然这个我可以定义叫做一个a j j table对吧?这是我们聚合起来的一个结果,这就是一个查询转换的过程啊啊呃,那除了这个table API的这个用法之外,大家自然想到了对应的肯定有CQ的写法,对不对?那大家看一下刚才我们这两个简单转换和聚合转换,怎么样用这个T,呃,CQ直接实现呢?啊,这里面我就不写那个具体CQ的定义了啊,我直接就直接table env,大家知道最后是不是就是CQ query直接执行啊,我直接把那个字符串CQ直接写在这就完事了嘛,前面的那个result table,这个非常简单,是不是直接select select,呃,ID。
08:03
还有temperature from对吧?呃,这个前面我们那个表是叫做。表就叫做input table对吧,这个我们就叫input table啊input table。呃,然后接下来是where。ID等于SS46对吧?哎,这是我们前面的这一个操作,然后另外还有就是table in CQ query,下面这一个怎么做呢?那当然就是select,还是要提取的字段,那是在最后这里边指定的ID,然后是。啊,id.count那在CQ里面大家知道直接是不是就是count ID啊,对吧,直接这么写就完了,As,但是as大家注意,因为count是这里边我的方法嘛,相当于关键字对吧,这个不要不要重啊,我们定义这个字段名称的时候,不要跟这个方法名称重了,那我叫一个CT吧啊然后另外就是是不是直接这个求平均数啊,Avg temperature,然后这里as avg tank,然后from当前的input table,另外还需要怎么样group by ID是不是就是这样啊?
09:25
哎,这不就是我们定义的这个过程吗?好,那当然了,这里边我们可以把这个也定义出来,这个叫CQ no CQ a j table对吧?啊,那上面这个我就不用写了,大家知道这个结果很简单对不对?那接下来我们就做一个打印输出啊,这个打印输出的话,还是像我们之前的那个做法啊,Table inv twoend stream,对吧?把前面的这个result table,我先做一个打印输出,呃,然后里边我要有一个肉点class放在这儿,Print,这个是result,然后同样对应的啊,大家会想到这个result,这个a j table啊,也可以做一个打印输出,点class print,这个叫AJ,最后还有这个CQ,我们也看一下吧。
10:24
这里边呃,CQ a table,肉点class。这个叫CQ agg。好,大家先我们先运行一下啊,先看一下这个效果怎么样。大家会觉得这个代码可能有问题吗?就这里边我们直接把这个得到结果,然后直接做这个聚合输出,大家看报错了对吧。就这里边之前我们明明这个打印输出是可以的,对不对,哎,那我们就把后边啊,我我们比之前的那个简单示例,是不是相当于多了两个聚合操作啊,我先直接打印输出这个之前的这个结果啊,Result,大家看一下这个行不行。
11:14
我提取那个346啊。当前的ID和temperature,就把那个346对应的那个字段提取出来,诶大家看这里边还是报错了,我们看一眼啊,Temperature写错了是吧?哦,大家注意,这里边我定义的是temp对吧?那这里边提取字段的时候本来就错了啊,所以这个当然是我需要去前面的这个都改过来,Temp。或者是改前面对吧,呃,但是我觉得那个好像太长了是吧,我把这个都都改成短的好了啊,这个确实稍微的麻烦了一点,大家看这样就没问题了,对吧?所有的那个用到temperature啊,这这这里还有一个用到temperature的地方,我们全部用这个temp把它实现出来。
12:05
看一下现在这个效果怎么样。哦,大家看诶这个现在我们当前的这个result是不是没有问题啊,直接可以把这个346提取出来输出对吧?好,那我们再把这个打开。然后我们再把这个聚合的a j table和这个CQ a table试一下,看看它能不能正常正常输出。我们看这里边又报错了啊,这是这里又报了一个什么错呢?大家看他说的是table is ANA pen on table。什么叫on the table呢?这是一个只能追加的一张表,对吧,它让use to retract stream,然后去处理这样一个信息啊,所以这里边主要的问题就在于大家想这个为什么他管之前的这个叫stream的,就是因为我们这个数据是不是来一条追加一条,来一条追加一条,这张表的数据是永远不断在后面追加的呀,而后边大家想我当前的这一个聚合结果。
13:10
是不是相当于要有更新操作啊,对吧,Countt值那是不是来了一个是之前的那个count值是基础上加一是要更新的呀,不是直接追加对吧?诶所以这个他他就发现这个有问题了啊,所以我们这里边呢,要把这个toend stream改成totract stream,这样的话就可以得到一个输出结果了。好,我们接下来把这个再运行一下,看看这个效果怎么样啊。哦,这现在我们可以得到这个正确结果,对吧,大家看一下这个result,这是346只有一条啊,我们就看这个AJ和这个CQAJ,当然这两个都是一样的啊,我们就看一个吧,AJ,大家看它输出的结果有点奇怪啊。它是先来一个true或者false,一个布尔类型的值,后边是不是才是我们输出的那个东西啊,再看后边输出的这个是不是我们的ID,然后当前的抗的个数,还有一个平均温度值啊。
14:13
你看这个第一条数据来的时候都是一对不对,然后第二条341的数据来的时候,大家看出现一个什么情况,它来了一个false之前这个,然后后边处这个变成二,然后36.05。大家看这是不是更新了,所以大家注意,这其实相当于是干什么事啊,他来一条更新的时候,它其实是输出了两条信息。这两条信息分别代表。是不是前一条前一条数据的撤回和现在新一条数据的写入啊。所以它其实把这个更新操作变成了一次删除和一次插入操作来做的,对不对,所以它前面是不是就相当于要给一个除false的一个说明,表示你到底是要删除的老数据还是要新插入的新数据,对吧?啊,所以大家看这个它为什么叫to retra stream呢?撤回流对吧,为什么是rere要撤回呢?啊,就是因为我要表示之前的这个更新操作嘛。
15:14
所以大家看后边我不是三四十一又来了三条数据吗?所以来一条这里是不是就是撤回一条,然后这个变成count变成二,算一个平均数对吧?然后后面再来一条,是不是又把二这一条撤回来一个变成三啊,然后接下来再来一条,是不是把三撤回变成四啊,所以这就是这样的一个过程啊。这就是聚合之后我们做这个,呃。最后转换成流输出的时候,那需要把这个更新操作表示成两条数据,一条数据的撤回和一条输出数据的写入,这就是我们对于table API和CQ啊做表的查询转换的代码的实现。
我来说两句