00:00
好,然后接下来我们继续第二部分的这个think,给大家安排的这个内容是red,呃,那这个red我们也很幸运,这个尽管官方flink官方没有这个连接器,但是把here有对吧?啊,所以大家看这里边直接我们把这个flink connector red这样一个连接器把它引入就可以了,Po文件啊。好,呃,然后接下来大家会想到我们是不是还是得去ADDS啊?呃,在一个文件里面去ADD think,我们先把这个文件的主体先写出来啊,那这个过程我们当然还是在think test下边去new一个object。这个我们就叫做red test。啊,这个那加载可能还稍微有点慢啊。我们先把前面的这些东西都写出来,同样前面还是EV创建一个执行环境。
01:12
然后我们把这个并行度先设成一,然后接下来其实大家会想到跟跟这个流程是不是差不多啊,对吧?呃,这里我就不再用那个卡夫卡做数据源了啊,就这里边我就直接还是把这个S这里啊,从文件里边直接读取出来。接下来做转换操作的话,那是不是,呃,大家大家会想到,就是接下来我们还是要去把它map成一个sensor reading对吧?呃,这里我们把这个直接copy过来。Transport啊,这里大家稍微注意一下,就是c reading,我们是从这个还是从之前定义好的那里去引入,这里边我们是不是就不需要把它再to string了呀,对吧?哎,当时我们是因为卡通卡的那个序列化啊,这里就直接就是sens reading输出就可以了,然后接下来大家会想到后边如果要做这个think的话,那是不是还是啊这这个就不用。
02:27
直接did stream是不是可以ADD think,那这里边是不是就又要new一个东西啊,啊对,所以这里面我们先空着啊,后面大家看这个要要new一个什么样的东西,最后大家不要忘记整个的程序流程里边对执行。Really think test。好,呃,然后上边这里边我们把它已经转换成了sensor reading之后这里边要添加这个最后的这个呃S呃,S function了,那么这个s function大家可以看一眼啊,我们在文档里边给大家已经定义出来了,这个s function有点特别,这是个什么东西呢?
03:11
它是一个叫做red think的一个东西,然后啊,这个本身输入的类型是sensor reading,对不对,然后里边呢,要传两个。一个是一个配置项,另外一个是一个叫做red map的东西,那这到底又是什么东西呢?哎,我们一个一个给大家先来做一个实现啊,所以接下来给大家定义一个conflict对吧?啊,那么这样一个conflict它是个什么呢?它是flink,呃,Jealice连接对吧?Jealy po conig先要做这个连接,这个连接用的是它builder,诶。Builder。然后接下来可以给他去set一些参数,比方说host,呃,那这里面local host对不对,创建连接嘛,然后有host就有port,一般我们red默认的端口是6379对吧?啊,然后这个这个连接池builder.build对吧,大家应该熟悉它这种这种调用方式。
04:27
好,这样把这个呃,连接的这个配置创建出来啊,这个其实是一个连接啊,这个配置好,然后接下来它是不是还需要有一个这个red map啊啊这个map可能稍微麻烦一点,我们看一下,我就直接在外面去实现吧,Class my。Red。他需要去实现一个什么东西呢?就是实现一个所谓的。
05:03
Map这样一个东西,这又又没有啊,我们得去做一个引入。它的数据类型本身我们输入的数据类型是sensor reading对吧?好大家看一眼这个东西,这个东西是什么呀?Java接口对不对,它里边相当于是得定义出什么东西来呢。它其实就是要定义出我们到底用什么命令对吧,往red里边去写数据,然后还得指定你到底要写什么数据,对不对,哎,所以这就是我们的red flink的连接器给我们实现的这样的一个一个map类啊,这样的一个函数类,所以接下来我们其实就要实现这里边的这些方法,复写这些方法啊,那那这里面大家看到这个首先是这个get command command的description。
06:05
这个是什么东西啊,这是不是就是保存到里面的命令啊,定定义保存数据到red的命令。好啊,那大家会想到我们这个数据怎么往里边保存呢?大家想把它写成一个什么样子呢?我们现在是一些传感器的温度数据,那大家会想到,呃,直观的一个想法就是说这个red里边这个数据啊,有可能就是说一个传感器是不是对应一个温度值啊,然后我们。传感器的温度变化发生变化之后,我是不是实时更新这个值,然后我们如果有另外一个应用程序,把这个值读取出来,展示出来,你就可以做这个监控显示了,啊,大家能想到这样一个应用场景,对吧?所以接下来我们要往re里边保存,是不是其实相当于就可以把它保存成一个这个是不是类似于一个哈希map呀,对吧?我们可以把它保存成一个哈希表,对不对?里边有类似的这个,呃,数据结构哈希表,那大家知道哈希表的这个存储的命令是什么,在哪?
07:26
是不是,诶哈西set h set对吧,哎,对,是这样的一个命令,所以接下来我们定义的命令是H这个保存成把。呃,这个传感器ID和温度值保存成帕西表啊,所以这里边我们的命令就应该是h set,然后是不是有一个key,有一个field,然后有一个value啊,大家还记得这个命令对吧?前面的这个K是什么呢?
08:07
这是我们表的名字对不对啊,表的名字,然后后边这个字段名才是我们,哎,就是对,相当于你要存哪个值对吧,所以然后对应的它的那个值是什么?所以后面这个才是k value那一对,对吧?啊,所以前面这个K其实是我们的那个字段名,那就是整个表的名,所以大家看我这里边就去定义一个,直接new一个啊。大家看要用一个什么,它的返回类型是不是red command description啊,你有一个command description,这里边调用一个red command。Command。把它引入这里边是不是有大家看有h set对吧,H set,然后后边我们继续传它的下一个参数是大家可以看到就是一个command,然后一个一个K对不对,所以这里边就是我们把它表名叫做就叫sensor temperature吧。
09:13
好,这就是我们要保存的这个命令,然后后边这个具体的命令还没定义好呢,为什么呢?啊,因为我们是来一个数据才存一次,对不对,那是不是你得针对来的那个数据是什么,才能知道后面这个field和value到底是什么呀?啊所以大家看它是把这两个单独再去做其他地方定义了,哪里定义呢?这里get value from data,那是不是这里要。定义保存到red的。呃,这个是value对吧。那同样下边这里是不是就是保存到red的,对P我们我们理解是key啊,其实是这里边这个field对不对啊好,所以呢,这个value应该是什么呢?那就应该是当前的元素是T对吧,t.value是不是temperature啊,就是这样对吧?那当然这里面大家会发现他要的是一个string类型,所以对to string。
10:22
那这里面的K呢,当然就是它的ID哦,它本来就是string,所以不用再to string了,对吧?大家看就是这样就可以把它实现出来,这就是我们呃,这个red map的这样的一个定义,那同样我们在这里ADD ADD think的时候,就要去new一个red是map吗?不是,是red think大家还记得吧,是这样一个东西。这样一个东西,它需要的参数里边要传一个coniggu base,另外还要有一个red map,对不对啊,这个con base是我们前面已经定义好的。com,对吧?那么map是不是要去new一个my red map啊,对吧?把这个用出来,这样就实现了往red里边写数据的这样的一个功能。
11:21
好,那接下来我们这个代码已经实现了,我们就来测试一下吧。呃,这里我们先需要去把那个red先打开,对吧,So,起来好。呃,另外。命令行也打开。C Li,对吧?好,呃,我们先看一眼这里面没东西吧,好空的对吧,然后接下来我是不是就可以启动这个程序往red里边去写数据了啊,大家看一看这个能不能正常写进去啊,让一下试试。
12:04
好,大家看到我们现在从这个文件里边读取有六条数据,正常来讲的话啊,它应该都能,诶这边因为我们没有打印输出嘛,这边就直接跑完了,对不对啊,那我们去看看这个言情况吧,果然是不是已经写进去了,对吧?这个正常来讲已经是写进去了,然后接下来大家会想到我是不是得看看里边到底写进什么东西啊,哎,那我得去去看一下它这个里边的值了,怎么看呢?H是get对吧?呃,那当然了,这里边我可以是get,然后呃,对,如果我想看所有的是不是可以get all啊,然后跟上我们当前的这一个K的这个字段。诶,大家可以看到这里面一共几条数据啊,诶这里面是四条数据,大家说一下为什么是四条数据呢?啊对,你可以还要会发现我们在存的时候是不是以这个sens ID作为key去存了,存一个哈希map对吧?那么当然这里边如果是同样的K的话,取哈希之后也是一样的,那是不是这个值就会覆盖啊,所以大家看341最后存的值是35.6,就是最后一次更新的那个值对不对?而别的就是他们唯一的那个值,这个当然是没问题的,所以大家看,如果我们这里的这个数据持续不断的在变化的话,我们是不是能看到这里这个啊re,里边的数据在不停的改变啊,对吧?而且就这么四条数据里边的值在不停的改变,如果我们有一个应用去读取它的话,就能够实时的把它展示出来。这是这。
13:51
呃,这一部分啊。
我来说两句