00:00
接下来我们就在代码里边一步一步按照这个顺序给大家一一做一个讲解,做一个实现,首先自然大家想到的就是你在开启这个source transform SK之前,是不是首先得有这个环境啊啊,所以我们还是新建一个代码。做一个测试啊,新建一个类,呃,当前我们这个就叫做table test。呃,这个是table test2啊,我们当前叫common API通用的这个API的一个测试。啊,我们可以把前面这个也加上这个名称啊,做一个。当前我们加上这个是table test1好把它做一个。接下来我们主要写的就是这个copy API这一部分啊,那首先我们第一步操作,我先把这个主方法先写出来,还是throw一个exception。
01:02
然后首先大家想到是不是我应该还是得有一个流失的执行环境啊,这个大环境是没有办法的,对吧,因为你的那个table table env表的执行环境是不是必须基于流的执行环境才能创建出来啊,所以这里边我们这一步是省不了的啊,所以execution environment,然后get execution environment这里边。把它还是先叫做这个env,这里边我们首先要说的啊,第一步是要创建环境,那首先我们这里边是要得到当前的流式的这个执行环境,我还是把全局的这个并行度先设成一。方便我们看测试结果,然后接下来我们已经知道了需要得到一个stream table environment,对吧?我们想得到一个流式的处理的一个表环境,那这个时候其实就是stream table environment,是不是直接做一个create,把这个env直接传进来就完事了呀,对吧?这就是最常见的一种用法啊,啊,这里边得到的就是这样的一个table env,这是之前我们说的啊,那其实我们想到这种调用的方式呢,非常简单粗暴,但是它并没有区分,我们说现在不是有两种不同的planner吗?有老版本的,然后有这个blink版本吗?那现在默认这里边是一点十用的还是老版本,那我想用blink的话怎么办呢?
02:28
啊,或者说01:11之后默认这么调用的就应该是blink版本对吧,那我想用老版本退回到老版本又怎么办呢。所以大家会想到明显这个create的方法,是不是里边还可以传更多的参数啊,诶大家点进来的话,看到源码里边就能发现啊,后边是不是相当于它底层掉的是两个参数的这个create方法呀,哎,这里边首先前面还是这个。当前流失的执行环境后边还要传一个environment settings,对吧,就还要传这样的一个配置项啊。
03:03
所以这里边我们可以给大家完整的再把这个过程写一遍,首先1.1,我们写这个,呃,基于老版本planner的。流处理,所以我们会发现啊,就是之前的这个简写方式是这样,如果完整的写出来的话,他其实应该先要去创建一个environment settings,对吧,要先创建这个,那有同学可能想到,那我就直接去new这个嘛,我们看一下它的这个构造方法。Private对吧?所以这个问题就又来了,如果构造方法是private的话,怎么去创建出来,怎么去用呢?诶,那有同学可能想到,那是不是应该会有对应的一个builder啊,啊对吧,所以这里边自然就看到这样的一个builder,然后这里边我创建这个builder的方法呢,它又不是就是大家最好不要直接去点builder啊,去直接这么用,因为这里边它提供了一个通用的方法叫。
04:08
前面啊,在前面。大家看到叫做new instance对吧,这里边可以去直接new一个当前的builder出来,所以我们一般就是直接在这调它的new instance,直接这么调就可以创建出一个当前的这个一个builder来,然后当前这个builder啊。大家看这个builder里边是不是最终是要调到一个build的方法呀,这个build的方法是不是可以返回这样的一个environment settings啊,所以最终我们其实就是new instance之后,最后再来一个build就完事了,那中间是不是就可以插入各种各样的配置啊,所以这里边我可以。大家看到最典型的这个配置其实就是首先你可以in streaming mode,另外还可以in batch mode,这是不是还可以设置当前的对到底是流处理还是批处理啊,因为我们说其实底层是不是应该是批流统一啊,所以在这里边其实你是可以设置这个流还是批的模式的啊,然后另外还可以就是use blink planner还是use old planner,对吧?当前我们是老版本的流处理,是不是就是use old planner,然后in stream mode啊哎,那接下来我们得到这个其实就是一个,呃,我把这个叫做old stream。
05:31
呃,Table env对吧。这就是这样的一个定义啊,那最后我们,诶这个这个应该是settings啊,写错了啊,这个应该是old stream settings,接下来我们才是要创建这个old stream table对吧?啊,那所以接下来我们其实还是调用stream table environment.create方法,现在就可以传两个参数进来了,把old stream settings。
06:02
直接传进来啊,得到的这个就是一个old stream table environment,其实大家知道这一个当前的这个环境是不是跟上面这个tablena是一样的呀,在一点十的这个背景下,直接这么使用是完全一样的。啊,那刚才我们也看到了,既然是有这个老版本的流处理,那老版本怎么做批处理呢。基于老版本planner的批处理。哦,这里大家就会发现了啊,我们说blink当前的这个最最新的这个blink啊,它跟老版本到底有什么区别呢。诶,当然最大的一个区别就是blink里边肯定是新增了很多之前不支持的功能,对吧,有很多新的函数呀,很多特殊的这些用法呀,都只有在blink新版本里边才去支持,那另外从架构上它最大的一个差别。
07:01
在老版本的基础上是直接发生了一个变化的,大家发现它不兼容嘛,对吧,直接就不兼容了,你如果只是功能的扩展的话,那我直接用老版本,你做一个扩做,做一个这个版本升级不就完了吗?那现在它不是它的底层架构是真正做到了批流统一,也就是说在blink planner里边,它最终是把所有的流处理和批处理程序都是转换成了data stream去做操作的。而大家想一下,在老版本里边,我们底层是不是不光只有data stream呀,是不是还有data set呀?哎,所以大家注意啊,之前老版本里边其实并不是在这个层面P统一的啊,它是底层我们处理的原理就是思路,处理思路是相当于data set,就相当于是一个有界流,对吧?哎,但是呢,我们本身这个API还是两套啊,还是data stream和data set。那所以在老版本里边,如果做批处理的话,我应该怎么去创建呢。
08:00
那大家想到是不是我现在根本都不能用之前这个stream table environment了,这就是都不能用这个stream execution environment了,对吧?都不能用这个env了,因为我现在要用的是那个批处理,大家记得那个批处理环境是什么吗?是不是直接就是execution environment,然后点BA啊,然后这个得到的是一个batch对吧?批处理batch env,我们先得到这个,然后老版本的话,那它要得到这个table table的环境那是叫做那个不是叫做stream table environment吗?那这里它就叫做batch table environment啊,所以这里边我可以直接去调它的create方法里边,自然大家能想到他要传的是不是就必须得是一个就是execution environment啊,就不能传stream了,对吧?啊,所以这里边就是一个execution,哎,No,就是前面我们的那个batch inv对吧。
09:00
好,这就是我们定义的这个,呃,这个O的。Batch table env,诶,这是老版本的这种处理啊,啊,那自然这里边我们就想到了,那如果是新版本呢?对吧,基于blink的流处理,Blink planner的流处理啊,啊,那这个整体流程其实我们想到跟前面这个是不是差不多啊,对吧?跟这个老版本这个是差不多的,因为都是流处理嘛,而且这里边我们既然是有这个setting,那是不是就相当于把这个setting换成一个blink的setting就可以了,那现在是不是就不是use old planner,而是use blink planner,是不是这样就完事了,然后接下来把这个改成blink stream table。这里边传的就是blink stream settings啊,所以这个就特别简单啊。就是当前如果大家要想使用这个blink的环境的话,诶,那当前我就是直接写一个配置,这里边是new instance,然后use blink planner in streaming mode,然后build传进来就完事了,那同样后边还有就是基于blink的批处理啊,那所以前面我们说这个基于blink的,呃,这种流处理和批处理它底层都是一样,哎,所以这里边呢,我也可以直接用之前的这种setting的方式。
10:34
那只不过现在我把它改成blink batch settings,下面现在就是应该use,对blink planner,然后in in Bach mode对吧,大家看到这个Bach mode其实只在blink这种场景下才能用到啊,这里边改成这样,然后那大家会发现另外一个问题,我这儿用的用的是这个stream table environment这个方法呀。
11:00
这个就不太对了,对吧?哎,所以这里边我把这个也改一下啊,这个叫blink batch,其实前面这个类型是不是都得变啊,就不能要这个当前的这个stream table environment了啊啊,那我现在其实要的是一个,大家想我难道是batch table environment吗?那就不一样了呀,对吧?诶大家看stream table environment,其实底层它就是一个。Cable environment,所以呢,我现在创建这个批处理的表环境的时候,直接用这个底层就完事了啊,所以这里边我直接用的就是一个。Table environment把这个引入,然后接下来我要传的是一个blink batch settings对吧?啊,就是直接这里边大家要注意啊,这个create这个方法里边呢,只传一个settings,不要再传环境了,大家看是不是这样,因为你这个环境还是什么环境。还是流环境对不对,对吧,我们的底层是那个流式执行环境啊,这里边还不需要了,他只要基于这个直接去把这个配置传进来,他就知道直接做这个操作就完事了,对吧?哎,所以这就是这个我们定义的blink。
12:15
Bachlink Bach。Table environment en,这就是我们讲到的啊,几种不同的场景,不同的版本啊,然后做批处理,流处理的时候,这个环境到底怎么去配。
我来说两句