00:00
已经有了执行环境,那接下来我们就是按照流程,是不是先要有一部S任务从外部去读取数据啊,啊,那接下来我们就给大家讲一讲这个south啊,South这一部分呢,又分成了这样的,就是不同的从不同的地方去读取数据啊,又有不同的这个S啊,那最简单的大家看到就是从集合里边去读取数据,然后接下来我们给大家做的这个应用场景呢,都是同样的一个应用场景,就是大家看到这个数据是长什么样的数据,都是一个sensor reading对吧?啊大家知道sensor是传感器的意思啊,所以接下来我们就相当于应用在这个啊工业物联网这个场景里边啊,我们要收集一些传感器的温度值,当前这个传感器都是温度传感器啊,那所以接下来我们收集到的这个数据就可以做各种各样的统计计算分析,然后做输出对吧?啊,那这里边读取这个数据的时候,不同的这个。
01:00
数据源,我们最后包装起来的数据都是叫这个传感器的温度读数啊,这样一个,呃,一个一个状态,一个类型,大家看里边的字段,主要就是有三个,一个是大家看字符串类型,这应该是一个。一个名称传感器名称,或者叫传感器ID对吧?啊,就表示这到底是哪个传感器,然后另外是不是应该有一个时间戳啊,对吧?啊,当前这个收集的温度到底是什么时间的温度,最后还有一个哎,这应该是一个double类型的一个温度值,对吧?啊,所以这就是我们当前传感器温度的这样一个应用场景啊呃,那接下来大家看,如果说我想把这个数据读取进来的话,这还涉及到一个就是我是不是应该把它包装成一个对对应的想要的一个类型啊,啊方便我们后边做各种各样的转换操作,对吧?所以接下来呢,我们第一步操作先做一些准备工作,就相当于是在准备数据了啊。我们直接在Java,呃。
02:01
这个s main Java下边现在就不是这个WC了啊,我们直接在新建一个对应的包,呃,com.I的硅谷当前我们就是API test对吧?呃,然后接下来我们本身的这个,呃,当前想要做的这个操作是想要去测一下S任务对不对,呃,想要想要测这个对应的S啊,所以我这里边S。点啊,第一步我们可能测的是这个source test1这个应该是。呃,Collection从集合里边去创建啊,我先把这个先创建出来,这个类先创建出来,然后接下来呢,我其实是要在这个里边,我要去创建一个draw class,但是这个我不应该在source下边啊,因为后边transform SK可能我也要通用,对吧,所以我还是啊直接。
03:02
在这里带上包名去拗一个。com点艾硅谷,点API test test,我创建一个,哎,对大家想是不是相当于我应该有一个这个Java并啊,呃,这里边其实是,呃涉及到这个flink里边的要求啊,就flink里边假如说我们要对它去做这个呃,更加容易的这种类型管理啊,如果让flink对他有更好的这个类型支持的话,那他可以支持什么样的呢?就支持这种简单的Java类,Java对象就是pole,然后如果说后边我们还想对这个pole有一些就是更加,呃,就是丰富的操作啊,可以直接指定它里边的key啊,呃,针对它里边做一些那个转换操作啊,那这个po还必须有一些要求啊,就是类似于我们之前那个Java并的一些要求,对吧,就是必须要有一个空参的一个构造方法,对吧,必须要有这个get center对应的那些,呃,就是属性都是。
04:02
私有化的,然后必须要有那个公公有的公开的get set方法对吧?啊,这些东西都是常规的一些操作啊,啊,所以这里边我定义一个这个并这个目录,然后当前我这个呃,传感器温度值的这个类,我干脆就直接叫做sensor reading吧。跟我们文档里边的定义是一样的啊,把这个先创建出来,好,我把这个写一下啊,注释这个就是传感器。温度。呃,读数。读数的,呃,数据类型对吧。啊,所以大家知道这个抓病的话,类似于都是一些实体类型嘛,对吧,有很多这个数据我们都可以包装成这样的样子啊,那这里边我们需要有哪些,哪些是数,呃,哪哪些这个数据字段呢?其实我们现在要的这个数据字段属性啊,主要就是啊传感器ID对吧,另外还有时间戳,另外还有一个温度值,所以现在啊,我直接用一个这个私有化的属性定义出来,对吧,传感器ID是一个string类型,然后接下来这个时间戳,哎,那是一个长整型,大家看到了是吧?Time step,然后最后还有一个温度值,这是一个double,我把它定义出来temperature。
05:39
那么按照这个flink里边对于这个po类型的要求啊,啊,它跟Java并很接近,但是大家知道Java并是不是必须要实现那个呃,Sable接口啊啊,我们这里面不需要啊,这个没有要求,但是呢,别的要求它跟抓va病很像,就是必须要有一个。
06:00
啊,就是我们所说的啊,构造方法必须要有一个空参的这样一个构造方法啊,然后当然了方便大家这个创建对应的对象的话,我是不是还可以把这个带参数的也给它创建出来啊,对吧,这个也设置出来,然后另外啊,我们就直接用快捷键了啊,因为这个要手敲的话就太麻烦了啊,那我直接get set所有的这些呃,对应的这些方法是不是全全给定义出来啊,对吧?啊然后最后为了方便我们做一个呃,就是打印输出,或者说做一个这个序列化,那其实我还可以把这个to string也重写一下,对吧?这里边我直接把这个所有参数啊都写在这个里边,自动生成就完事了,这就是我们当前需要的这个基本的数据类型啊,是这样的一个pole类型。好,那接下来我们就在这个代码里边要做一个实现了,首先PSVM。还是啊,先把当前的这一个,呃,就是main方法先写出来,那没方法里边是不是首先应该要抛出一个那个异常啊,大家还记得这个吧。
07:06
好,把这个写出来,第一步其实是应该要先创建执行环境,对吧。好,这个我们快速的写一下stream流失处理execution environment get这个然后我们把它定义成叫做env。那接下来就是从。啊,集合中读取数据,所以从集合中读取数据,大家看啊,只要读取数据是不是都要基于env,诶env就去调一个,诶这里边from,诶大家看from我可以from collection对吧?啊from collection就是意思就是说从这个集合里面读取啊,那这里边我可以直接比方说集合嘛,我直接变成一个例子传进来是不是就可以啊,啊比方说我用这个ar.as list里边传对应的数据,我现在数据是不是想直接包装成一个对应的那个sensor reading那个类型啊,所以这里边我就直接new sensor reading,然后里边对应的数据传进来就完事了,对吧?啊,那这个我就不详细写了啊,我直接可以把这个copy过来对吧?大家看这里边有几条数据,就是三四十一,三四十六,三四十七,三四十,然后它的时间戳按照顺序,它们的温度值也各不相同,我可以直接把这几条数据copy过来。
08:31
好。呃,这个应该是没问题了,对吧,From collection啊,然后我可以把它做一个定义啊,当前我这个可以叫A。我可以把它叫做。No,我可以把它叫做这个data stream,对吧,Data stream。诶,这个。总会放到这里来啊。我我前面的这个,对本身的这个。
09:05
这个应该是把它哦,大家看这里边我应该是没有没有把那个env给给给给全部那个选取上,对吧,它它是把这个当前这个ara.at least把这个包装上了,所以其实我前面后边是不是还应该有一个括号啊,对吧,这个才是from collection的这个参数嘛,这样的话我们就应该可以把它选上了,对吧?所以这里边我可以把它叫做一个data stream啊,当然了,大家看它默认得到的这个类型是一个data stream source啊,那其实我们知道它本质上是不是也就是一个data stream呀,对吧?我们做转换的时候啊,直接把它定义成data stream就可以了,这个就没毛病,这就是从集合里边啊,从这个collection里边直接读取数据,刚才大家其实也看到了,我还可以怎么样呢?这from里面除了from collection还可以直接from。Elements elements的话,这个其实就更简单,这相当于是啥呀,是不是就是直接指定对应的那个元素就可以了呀,哎,所以集合的话,我们是把一堆元素包装成一个集合类型,然后传进来的,那如果直接from elements的话,那是不是就是对我就是有什么元素,我是不是直接往进传就完事了,对吧?哎,大家看就是这样的话,你直接传进来,同样也可以构建这样的一个data stream对吧?啊,那比方说这个大家看他管这个叫integering,呃,Data stream source对吧?啊,这个就是能够看到这样的一个定义啊,所以我们接下来可以把得到的data stream做一个打印输出,打印输出可以看到对应的这个结果print,然后另外这个integer stream做一个print,那另外大家可能会想到,这里边两条流我都要做打印输出,那最后输出的结果怎么区分呢?
10:59
好,这里大家可以看一下啊,这个print里边其实可以传参的。
11:03
传一个什么参数呢?一个字符串表示当前输出的这个流的名称对吧?啊,那所以这里面比方说我上面这个就叫做data啊,下面这个叫呃,Int对吧?哎,这就是我当前两条流里面的内容。流失处理,最后不要忘记还需要是不是要执行任务执行起来啊,对吧?Env execute,然后细心的同学可能也发现了,Excu里边也可以传一个传一个参数,这个参数是啥呢?又是一个又是一个string,大家看这个叫job name对吧?啊对这个job name就是表示我们提交上来之后啊,大家看提交上来之后是不是应该默认有这样一个name呀,用来区分我们不同的这个任务对吧,不同的这个job啊作业大家看默认的这个是叫什么名。Flink streaming job对吧?哎,这其实你如果点进去的话,也会发现execute里边,这里边是不是传的就是default job name啊,然后里边是不是就是flink streaming job啊,所以你只要传一个啊,把这个做做做这个更改,那那其实也是可以的,好,我们就来执行一下,看看这个效果怎么样吧。
12:17
好,大家看一下这个执行结束,我们看这里的结果,这里的执行结果是不是就是前边有一个小标,大家看,呃,在这个本来有一个小标,大家记得这是干什么的吗?这是指定当前,那就是现成的那个编号,对吧,或者说大家可以理解成就是定行子任务的那个分区号啊呃,分区的那个编号,所以呃,我们现在我没有指定并行度,那是不是默认就是。默认就是四对吧,所以大家看这个这里边是不是数据来了之后,是不是相当于一个轮群的状态啊,挨个挨个去去呃去呃对应的那个分区去输出对吧?所以这里边你看到呃,这个1234分别都有数据啊,然后两条流又按什么来区分呢?是不是前面我就追加了一个对应的名称啊,然后冒号后边才是对应的我的这个分区的数啊,那所以这里边你看就是INT124啊,一八九六十七啊,这里边是我这个对应的这个3READING对吧?Data这里边是34READING,大家看为什么顺序不一样呢?跟我们输入的这个顺序不一样呢?哎,因为并行执行嘛,对吧,并行一旦执行的话,我这里边快速读取,然后分分到了不同的区,最后输出是不是有可能就不按顺序输出了啊,所以大家就会想到,那我要假如说啊,我在这儿严格意义上要按要按照顺序输出,那怎么办?对这个其实非常简单,我直接。
13:44
诶,有同学说我在后边这个print的时候,把它那个病毒设成一。这个可以吗?大家注意,我这里边射程一的话,是不是只有是不是只有输出的这一步操作是他进来了之后,这个顺序不改变,直接输出啊,但是大家会想到前边我做处理的过程当中,读取数据是不是已经是乱的了呀,哎,所以这里边你看啊,我把这个print in这一部分,把它这个并行度已经设成一了,你看这里边这个啊,当然这里面大家看到输出的这个结果其实是按照这个顺序输出了,这个主要的原因在哪呢?大家知道吧。
14:24
这个原因啊,其实不是太快了啊,这个原因在于前面我们讲到的,大家看读取这个数据读进来之后,然后不做任何转换对吧,然后直接这里边给一个这个呃呃,Set set它的这个并行度,那大家会想到这里边是不是相当于我我我这里面就可以怎么样啊,是不是直接把这两个就直接合并在一起了呀,对吧?而而且就是说from elements,大家会想到这个并行度应该是几啊。这个并行数其实就是一对不对,你说这里你可以直接这个并行的去读取这个数据吗?不可能啊,你这里面我们我们写进来的是不是就是按照顺序写,写到这里边的一组数啊,它是不是并行度只能是一啊,所以你这里边在后边print设一,其实就相当于它的两个任务都是并行度是一,那最后是不是就相当于合并在一起了?呃,所以说这个其实是,呃,就是相当于跟这个全局设定是一样的啊,所以大家如果要是想要让他最后执行的结果严格意义上一致的话,我可以在全局这里边给一个set,这个一,这个是不是也是一样的呀?而且大家看到这个设置了之后,是不是就没有那个编号了?
15:39
就变成只有我当前print的那个名称后边加一个那个大于号那个间括号对吧,然后表示当前的这个指定的name啊,所以这个其实整体来讲啊,整体来讲还是非常的直观的啊啊大家看这里边就是124啊,这个是按照顺序,后边这里一六七十是不是也按照顺序了啊,这个就完全都是按照顺序来的,那注意它俩之间。
16:04
这里是不是就不一定有顺序啊,为什么这里边不一定有顺序?对,因为大家想这两条流是不是相当于是并行不悖啊,大家看它俩有关系吗?一点关系没有对不对,既然一点关系都没有,那是不是就分别一个线程抢占我们那那个slo资源不就完了吗?谁抢上是算谁的,对吧?所以他俩之间是本身是没有这个先后关系的,只有一条流里边的数据才有先后顺序啊,这就是从这个集合以及从这个元素里边读取数据啊。
我来说两句