00:00
好,接下来我们已经了解flink是什么样的东西了,那就需要理论联系实际啊,我们首先要敲一个代码来看一看这个flink到底怎么用,这样才能对它有更深刻的认识。呃,那接下来呢,我们还是常规的啊,用I idea作为作为这个IDE工具,然后接下来呢,开发一个美稳项目,然后接下来我们要用到的这个语言是Java啊,对于这个flink而言,它给我们提供了Java和SKYLA2套不同的API,我们现在是以Java为例来给大家做一个讲解啊。啊,那首先我们创建一个当前的project,当前的group ID com.at硅谷写出来,然后当前的RI,我这个就叫做flink教程啊,就叫flink tutorial。把这个先创建出来。
01:00
好,这个创建出来之后,首先啊,我们需要去面对的就是这样一个po文件,那po文件大家知道在vvi项目里边主要就是做相关依赖和插件的管理的,那现在我们要写link程序,需要引入什么样的插件呢?什么样的依赖呢?诶,这里给大家看一下文档里边的写法啊,啊首先上面是基本的自动生成的这样一些跟这个相关项目相关的一些基本信息,下边当然就是依赖就是dependency啊dependency我们看一下需要引入的是flink Java啊,这就是我们当前要开发用Java开发flink的时候必须要引入的一个包,另外还有一个叫大家看叫flink streaming Java 2.12,诶这里面可能大家会看到有一个问题啊,这里面这个2.12是什么意思呢?对,大家可能想到2.12,这个看起来比较像scla的版本,对吧?诶这个确实是啊,这是scla版本,那为什么这明明这是这是Java的这个流处理对吧?弗link streaming嘛,Java的流处理为什么又有SKY的版本呢?
02:13
这里面给大家稍微的解释一下就是啊,我们先看这里边把这个po文件先先这个一些解决掉啊,当前的一些设置先解决掉,然后我们把这个对应的就直接可以copy过来,这里边的dependence is。里边主要就是一个Java flink Java,另外一个是flink stream Java。好,现在我们把对应的依赖引入,大家可以看一下啊,啊,当前我们的这个项目里边。我们把这个刷新一下。当前的这个dependency下边啊,呃,大家看这里边有这个flink steming Java和flink java2个,然后后边的这个一点十点一,这是当前我们这个本身模块,呃,就是当前引入依赖的版本,我们当前用到的flink版本是一点十点一,所以直接写这个啊,那后面的这里这个stream Java后面的2.12,它指的是。
03:14
Skyla的版本啊,那为什么这里边我们会用到这个skyla的版本呢?啊,这主要就是因为大家看到在这个flink的这个wrong,呃,Runtime里边啊,运行时这个组件里边,它是用到了一个工具阿卡对吧?啊其实大家知道阿卡的底层是用什么写的呢?阿卡底层是用LA写的啊,Skyla最最经典的应用其实就是一个啊,Spark的这个底层啊,我们大数据处理引擎啊,是用skyla写的,另外一个就是阿卡啊,我们做这个高并发处理的这个这个框架啊,它底层是用skyla写的,所以这里边这个即使我们用的是这个Java版本的啊,我们用的是这个Java版本的API,那这里边也需要指定一个当前SKY的版本啊,这主要是。
04:03
它依赖的那些组件决定的,那这两个引入之后,我们接下来就可以直接写代码了,呃,我们接下来首先要写的是一个world count,大家知道对于大数据处理的这种呃程序来讲啊,World count就像一个hard word一样,我们整个的流程其实就是。想要有这么一组这个呃字词,然后呢,呃,我们可能把不同的词要提取出来,按照它出现的次数做一个统计,最后我们输出每一个词有几次,每一个词有几次,对吧?啊,其实就是这样的一个需求啊,那我们来看一下在代码里边怎么实现呢?当然我们要把这个代码写在SW Java下边,对吧?啊,那接下来我们在Java下边去新建一个。抓到class当前我们带上带上包名啊,com.at硅谷点这个包我叫做WC啊,大家不要想歪这个WC count的缩写啊。
05:06
Word count直接把这个创建出来,新建这样一个类啊,然后现在首先我们先做一个,对于弗林而言,我们不是说它是批流统一,可以做批处理,可以做流处理吗?哎,我们这里边先给大家举一个简单的例子,就是做一个批处理,所以这是一个批处理word count程序啊。那对于这个批处理而言,是不是我们的那个数据应该是现成的呀,应该是已经一批到齐的对吧?呃,然后我们把这个数据读取出来,然后直接做处理啊,读成这个数据集不就完了吗?啊,我们这里边要调用的当然就是那个data set API了,那现在我们没有数据啊,啊,没没关系,我们自己来定义一些测试数据吧。你有一个file啊,这个file我就叫叫hello吧哈,点TXT里边我们随便写一些几行几行这个字符啊,Hello word啊,然后我可以啊,Hello flink。
06:11
当然我也可以hello Spark对吧,各种hello,呃,哈哈,Hello skyla,这这哈,什么都可以啊啊,那那当然我们还可以how are you,多写几行,Fine thankyou啊,And you是吧?啊,大家这个英语都学的很好,肯定没问题啊呃,多多写几行,接下来我们就要用这些数据来作为我们做word count的这个数据源,好,那来看一下这个当前这个类里边怎么去写这个代码呢?哎,整体来讲肯定先是把那个对对应的那个main方法先写出来,对吧?呃呃,PSVM我们先写出来,然后接下来首先第一步,这里边要注意的第一步啊,是要创建执行环境,诶这个听起来这个概念有点奇怪,其实大家结合这个Spark来看的话,也就不奇怪,Spark里面我们首先是不是要有那个上下文啊,对吧,Spark context嘛啊,你先获取到当前这个SC,然后基于它再去定义接下来我们的处理操作,那所以这里面也一样啊,只不过这里边我们不叫上下文,而是叫执行环境啊,所以整体来讲这个概念是接近的啊,类似的,那这个执行环境怎么写呢?大家看是叫做execution environment,就这个东西啊,Execution environment,然后怎么样去创建点get execution environment。
07:45
调用他的这个get方法。这里面得到的这个东西,我可以把它叫做env,对吧,大家看它的类型就是一个execution environment。然后接下来有了它之后,那我们是不是就可以从文件里面去读取数据了,对吧?从文件中读取数据,那首先我定义一下当前那个文件的路径吧,啊,当前的那个,我把这个叫做input pass。
08:18
呃,这个我就直接copy一下这个全路径了啊哈,文件的这个全路径copy pass。直接复制到这里。然后接下来啊,那我们是不是就直接基于这个前面定义的env,然后怎么样读取数据呢?哎,这里面有一个方法叫做大家看到read可以read csv file read file对吧,还可以read text file,我们现在TXT嘛,我直接来一个read text file。然后把当前的input pass传进来就可以了,好,那当然这里面大家会看到我得到的是个什么呢?对,大家看到啊,得到的这个类型叫一个data source,也就是说这是一个数据源,那这个data source本身又是个什么东西呢?
09:08
我们可以点进去看一眼啊,大家看data source,它其实是一个。Operator对吧?是一个这个叫运算符,或者叫一个算子,然后这个算子呢,它又是一个。Data set,所以大家看最本质我们当前啊,把数据读取进来之后,最后处理的其实是什么呢?就是一个data set,一个数据集,对吧?所以整个批处理的这套API在flink里边就叫做data set API,我们接下来调用到的这个方法啊,这里边调用到的这些所有的方法,大家看都是在这个data set这里面,就在这里边可以调用各种各样的方法。啊,所以接下来我们就是主要是针对它来做操作啊,那当然这个我可以改一个名啊,呃,这个,那那对于这个data source而言,大家会想到,那我直接比方说我不写这个data source,我直接写成一个data set可以吗?这个是可以的,对吧,因为本身这个data set是它的父类嘛,所以直接这么写是没有问题的啊,那这里边我也可以直接给一个其他的名称,我们把这个叫做input data set,这就是我们当前读进数据源来之后的得到的这个数据集,那接下来要针对它进行处理了,对吧?对数据集。
10:35
进行处理,怎么处理呢?大家想想当前我们这个数据啊,这个读进来之后,这里面的每一条数据长什么样啊,我们现在是read text file,那其实大家想到你读这个文本文件基本的一个想法,是不是就应该是对一行就是一条数据啊,哎,所以那接下来我们针对这这所有的数据要去做一个做一个word count啊,那是不是第一步应该要先把它。
11:06
是不是要把它拆开啊,要把它打散,拆成一个一个的word对吧?哎,那后面拆成word之后怎么样去做统计呢?怎么样count呢。哎,对,那另外之前我们也做过这样类似的方法,是不是我可以把它在后边再追加上一个一啊,也就是相当于我把它转换成一个元组,诶,那有一个word,我就把它转换成一个当前这个word,然后一这样一个元组有一个啊,又是这个word一个一对吧,那后边我是不是直直接把对应word后边那个一加起来,就是当前它的个数啊,那所以整体来讲这个还是非常简单的啊啊这跟这个呃,Skyla本身的那个数据类型里,呃里边定义的这些操作,以及这个呃,Spark里边啊,大家熟悉的这个world com的写法基本上都是一致的,所以接下来我要做的操作其实是按照按照什么来分词呢?哎,空格对吧?呃,所以接下来我是按空格啊分词展开,然后转。
12:18
换成转换成WORD1这样的一个二元组进行统计。所以接下来啊,那那大就想到是我是基于这个input data set去做操作是吧?哎,那首先我是不是直接既然要打伞嘛,大家能联系起来可以调什么操作吗?对,是不是直接可以做一个flat map呀,因为flat map大家想在那个scla本身那个语法里面,它的含义是,呃,就是map之后做一个flat,做一个扁平化,对吧?哎,那这里边flink里边定义的概念其实跟那个差不多,它也是你根据里边的那个map操作啊,做一个转换,然后呢,把它直接打散全部输出对吧?啊就是每一个都当成一个元素,所以这个flat map它有一个特点啊,就是最后是不是相当于我一条数据可以输出多个结果啊。
13:15
对吧,要打散嘛,所以有可能会输出多个结果,好,那呃,接下来我们其实就是要在这个flat map里边要定义这样的一个操作,哎,其实就是把里边的内容要打散,然后呃,就是得到的那个每一个word再转换成一个WORD1这样的一个二元组,最后把它输出,可以输出多个,那大家看一下这个flat map啊,这个API调用它里边要传一个什么东西。这个稍微有点麻烦啊,传的是一个。Flat map function。这是一个类对吧,大家看到这是一个类啊,然后呢,这个类里,呃,这个类这又是什么呢?啊,大家看这是个接口对吧?所以我们要传的是一个类,然后要实现这样的一个接口,那么这个接口里边又必须实现什么方法呢?
14:05
就是一个叫做flat map的方法。然后我们看一下这个方法里边,首先它没有返回值对吧,没有返回值,那大家想我这里边要输出的结果,你最后靠什么来返回呢?哦,大家看这里边它的参数,一个是value,另外一个是out,诶所以这里边value比较好理解,Value是不是就是我们当前。我当前不是做这个数据处理吗?是不是每来一条数据之后,都应该对应的那个value就会调到这里来啊,作为当前的参数传给这个Fla map方法对吧?哎,那当前的数据是这个value,那我现在要把它做打散,要输出的时候用,用什么输出呢?就用后边的这个out out本身是一个collectoror,是叫收集器。那大家想这个收集器是干啥呢?简单来讲就是你把想要输出的那些结果都收集起来,都放到我这儿来,然后大家想最后是不是我就会把这些全部都发出去啊,所以整体来讲,从概念上来讲啊,它设计这个收集器的时候,Collector它是对应着什么呢?对应着那个所谓的eer迭代器,大家想迭代器是什么,迭代器是不是一个集合,已经这个数据都到那儿了,然后我是一个一个拿出来啊。
15:29
大家想想是不是这样一个过程,一个一个拿出来,一个一个迭代对吧?而现在呢,这个好,那这个collector的话,它其实从定义上来讲,它是跟itator就刚好是一个反面,所以就是itator是从一个地方去拿,一个一个去便利拿数据,而它呢是收集,就是数据一个一个来,我把它一个一个都放到一起去,对吧?哎,所以它是这样的一个一个概念啊,啊那所以最后我们要去调这个要要去输出的时候,调什么方法呢?就调这个collector的一个collect的方法收集,对吧?啊直接把它收集起来,这就相当于最后我们这些数据都要发出去了啊,所以这是这个应用的一些比较特殊的地方啊,那所以这里边我们写这个代码怎么写呢?
16:22
我可以直接就去定义一个,因为我要实现那个那个interface嘛,那个接口吗?我直接定义一个my flat map,直接把它定义好,然后接下来我就需要在外边是不是要去做一个实现了,对吧,这里边我自定义啊,自定义类,然后实现的是flat map function接口。哦,那所以现在啊,Public static啊,因为这个我直接调用的话,需要给一个这个static类,然后my flat flat map上面这个我也用一个大写啊好,然后接下来implement一个flat map function对吧?好,Flat map function。
17:18
然后里边大家会看到啊,这个后边它是有这样的一个尖括号定义的,这个在Java里边是什么?对,这是泛型,而且大家看到这里面有两个泛型,这表示什么意思呢?对,大家想到了,你这显然就是说我当前要做Fla map嘛,那是不是我输入的是一类的数据对吧?最后经过转换计算之后,输出是不是数据结构有可能不一样啊,那所以这里边你看到输入的数据叫做T,输出的数据叫做O,对吧?所以当前我定义一下输入的数据是什么类型呢?对,是string,每一行就是一个string,那我输出想要什么东西呢?
18:02
我想要的是大家注意不是string了,我想要的是二元组对不对?哎,那这里面就有问题了,哎,我们在那个skyla里边直接括号括起来就是二元组类型,对吧?Skyla自带有元组类型,那Java里面没有元组类型怎么办呢?啊,当然我们可以用map对吧?哎,但是这里边大家注意啊,就是在这个flink里边,它其实是给我们提供了flink自己实现的一套元组类型的啊,所以我们在这这里如果用到元组类型的话,最好就直接用flink的这种元组类型,Flink Java里边给我们提供的元素类型啊,那这里边就直接叫我们现在二元组对吧?啊,这个TEMPLE2,然后这里大家要注意啊,你不要选这个scla的版本啊,大家看,因为因为我们那个flink stream Java里边是有那个skyla相关的依赖的,对吧?啊,所以这里边其实它那个你直接引这个skyla的元组也可以引啊,但我们现在用的是这个呃,Java对吧,大家看这个这个包啊,看清楚上面这个啊。
19:07
这里用到的是flink API Java temp.TA2。然后接下来大家知道本身这个啊,这个二元组它就是这里边是不是也应该要有泛型啊,对吧?因为元组类型里边本身你可以包含各种不同类型的数据啊,所以本身二元组也只是一个外边包装好的这个数据类型,里边还要定义它具体的数据类型,String,这是word,然后COUNT1啊,那是inte对吧?哎,直接把这个定义好,这就是我们当前你看定义完这个之后,上边这里就不报错了,对吧?上边是这个语法通过了,下边还没过下边因为必须得去实现这个flat map方法,我把它直接做一个实现啊,那对于这样一个flat map方法而言,里边其实核心就是拿到了当前这个value,那我是不是先要把它做一个分词啊,对吧?我们首先按空空格分词,所以我首先应该得到的是一个。
20:11
String类型的数组words啊,我直接用这个value.split去做一个切分。然后得到的这个word,那接下来是不是每一个word都要加上,后面加上一个一包装成一个二元组,然后直接输出啊,哎,所以这里边就涉及到一个问题,那每一个word都要输出,那怎么办呢?For循环便利对吧?哎,所以像之前那个scla语法里边的话,大家可能想到我可以直接针对一个集合类型啊,直接去做一个转换操作,那现在Java语法不支持,那你就只好一步一步来了,对吧?所以这里边就是我们遍历啊,遍历所有word包成二元组输出,所以我们接下来是一个for循环,这个for循环里边每针对当前的每一个word,在words里边的每一个word啊。
21:14
做一个包装,我现在是不是要包装成一个new,一个TOP2里边string integer啊,当然这个就是其实我不写大家知道也可以了,对吧?当前这个字段是已经定定死的嘛,就是word和一对吧,那另外要输出你直接能这么这么这么写,或者是直接return吗?当前这个方法并没有,并没有这个返回类型,对吧,我现在是要对用out点,大家还记得吧,Out有一个collect方法,我用这个方法就可以直接把想要输出的数据做一个。输出数据啊,这这就做完了,对吧,大家看这就是这样的一个输出数据的过程。
22:03
啊,那所以上边这一步啊,我如果做完了这个flat map之后,得到的就是一个word一个一个一一个word一个一个这个过程了,哎,那后边如果我要想继续继续做计算的话,怎么办呢。那是不是我得按照word做一个分组啊,对吧?哎,所以接下来我得做这个group by,诶大家想到group by嘛,有有这个API对吧,Group by那里边我go by什么呢。诶,之前你像Spark里边我们直接可以group by对吧?呃,我默认前面那第一个字段就是K嘛,那现在怎么办呢?Flink里边没有默认的K,那所以group里边大家看啊,你可以传一个K的选择器key select。啊,那当然这你得实现又得实现这个接口了,那另外更简单的方法是什么呢?你可以直接传一个整形的位置,就当前到底是第几个位置,直接传一个这个就完事了。
23:04
那或者也可以传一个string,就是当前那个字段啊,到底叫什么名字,你直接传进来也是可以的,但是现在我们是原组类型,我当然不知道这个字段叫什么名字了,对吧?那所以是不是传一个位置就很合适啊,那我现在怎么传这个位置呢?就直接GOODBYE0,这就是第一个位置,大家看啊,就是按照。第一个位置的word分组。然后接下来是不是就可以做统计了啊,那统计的时候是不是直接sum就可以了,因为后面都是一嘛,当前这一组里边所有的数都算起来,那当然就是啊所有的值了,大家看这个sum里边是不是也可以传一个int类型啊,是不是就表示当前的那个个就是位置啊,所以是SUM1对,大家要注意这里的SUM1可不是,呃,就是这个一表示的不是我们后面的那个一对吧,不是表示这个一这个值啊,这里说的一是位置,第二个位置对吧,所以是。
24:12
将第二个位置。上的数据。呃,求和,所以这就是我们做这个计算的一个完整的流程,就这么简单啊,就这样就把它直接搞定了,那当然这个这个还没算完,因为这只是把这个算算出来了,对吧,那大家就会想到接下来我是不是应还应该对它做一个打印输出啊。对吧,所以这里边我可以再定义一下啊,我定义一个,因为都是data stream在做转换嘛,所以呃,不是啊data set啊,说错了,我们现在是批处理,所以是data set。那这里边我是不是应该得到的是一个什么,还是一个二元组类型的呀,对吧,二元组类型string integer。
25:07
然后啊,这个我可以随便叫一个名字啊,比方说这个我叫做result stream,呃,Result set,对吧,Data set,基于这个input data set去做这样的一个转换,最后是不是直接把它做一个打印输出就完事了,对吧?Result set做一个print。诶,大家看这里边报错了,这是因为print这里边大家看它是要throw这个抛出异常的是吧,所以这就要求我们在上面做实线的时候,对。Main这里边是不是就必须也要去throw一个exception啊,对吧,这样的话就不报错了。这是当前我们代码的实现,好,那接下来我们来运行一下这个代码,看一看运行效果怎么样吧。
26:01
好,我们看一下这个执行的结果。好,这个结果已经输出,大家看一下当前我们得到的输出结果是不是就是一个word一个count值,一个word一个count值啊。当前最大的应该是hello,这个出现了四次,哎,所以大家看HELLO4啊,那么第二出现次数多的是you,出现了三次,别的词都只出现了一次,这就是我们要的结果,对吧?哎,所以这个完全没有问题啊,一个批处理的wordout。
我来说两句