00:00
啊,接下来我们再看一看,往其他的一些数据库里面,卡夫卡的话,它是消息队列嘛,所以我们这个流逝的一个输出就非常的顺畅,那假如说我们想要写入到其他的一些,就是做数据存储的这样的数据库里边怎么办呢?比如说大家非常常见的常用的这个怎么去写呢?哦,Red的话,大家会发现在之前我们提供的这个官方连接器里边,好像没有见到red的身身影,对吧?但是诶,对比较幸运,我们在这个把here这个框架里边提供了对应的这个连接支持,所以接下来我们要引入的依赖是,诶大家看是这个阿帕奇把here里边提供的一个flink connector red,对吧?啊,这里边要需要解释的一点是,这里边啊,它这个连接器把here提供的里边只有skyla2.11的这个版本,没有2.12的版本。啊,所以这里边呢,可能还会有一点问题,但是好在呃,就是当前它的这个没有用到2.12新特性的一些,没有受到这些影响,所以说你直接用这个版本也是可以的,对吧,问题不大啊,因因为大家知道它底层其实就是跟阿卡相关嘛,对吧,本身我们这个代码里边并不涉及到太多啊,然后另外这里面用的用到它这个连接器的版本是1.0。
01:17
这个就跟前面这个卡夫卡这里边官方连接器这个就不一样了,对吧,官方连接器的版本就跟着flink的版本走的,而这里边它直接就是一个1.0啊,所以我们可以把这个先引入一下啊。把依赖直接引入到泡文件下。好,这是当前这个red的一个连接器,然后接下来我们就可以在目录下边新建一个。测试文件。当前是THINK2。好,那呃,接下来我可以把这个先刷新一下啊,把这个对应的依赖要引入。
02:04
好,我们把这个已经引入之后,那接下来整体代码里边,前面还是一样啊,把这个那方法先写进来。啊,我们把这个还是直接从文件里面读取数据就好了,不用再去复杂的做这个测试了啊,Data stream对应的这个依赖依赖引入,最后是env execute,执行起来还有一个划括号。呃,那中间还应该有这个对应的转换操作,对吧,我们应该把它转换成一个对应的这个sensor reading这样一个类型。呃,这里大家要注意一下,最后我们要的是3READING对吧,那就最后不要to string了,这是我们前面做的这个常规操作,然后接下来如果要是想写入到这个red里面去的话啊,那自然我们想到是不是就是data stream at think,然后这里边应该new一个think function,这个think function应该是。
03:08
呃,这个对应的这个连接器帮我们实现的,对吧?哎,那这个think function应该长什么样呢?叫什么呢?哎,这里边这个本身的这个就叫做大家看就叫做red think对吧。就叫这么个东西,如果你要点进去的话,就会发现ready think它本身。是不是就继承了rich thinkk function啊,啊,这是think function的那个负函数版本对吧?啊,富有的那个版本啊,那本身它是不是又是继承自这个obstract function,另外实现了s function接口啊,跟我们前面那个所有的那个函数自定义函数类是不是一样的呀,负函数是一样的对吧?好,所以接下来我们就要看这个think function里边,那那有同学可能想到这个think function里边主要是什么东西呢?Think function里面最核心的是一个invoke方法,Invoke方法invoke本身是一个,呃,调用唤醒的意思对吧,唤醒调用的意思啊,所以它这个调用调用一次是要干啥呢?就是要。
04:11
向外部是不是要调用一次,是不是要写入一条数据啊,至于怎么写,那就看你怎么样去定义了,对吧?所以这里面的那个核心,所有的那些处理逻辑,其实都是自己去定义出来的。好,那接下来我们就看一看这个red到底怎么去写吧,里边我们看一下它的这个构造方法啊。构造器,诶大家会看到这里边这个构造器有两个参数,一个是一叫做flink je con base,另外还有一个叫red map。这两个参数分别表示什么含义呢?呃,一个这个con base,大家知道gene con base嘛,那是不是要对建立一个gene连接啊,做一些基本的配置,对吧?你总得告诉我那个red服务器,那个主机名,端口号是什么嘛,这些总要总要配置的,所以这就是一个配置项,然后后面这个red呢。
05:06
这就是后面我们要向那边去写入数据,那大家想是不是相当于你要调用那边的一个写入命令啊。里面写入命令,那那是五花八门的对吧,本身是k value的这样的一个,呃,类似于内存的一个k value的存储存存储的数据库嘛,那我们直接写操作的时候可以直接set一个K对吧?啊可以去设置它的值,那另外假如说我们保存的是一个比方说像一个哈希map一样的东西,对吧,一个哈希,那是不是得h set呀。对吧,或者你如果要是其他的一些东西,你就是一个数据集啊,或者说其他的那个,比方说是一个list的话,那是不是还得l push啊啊对吧,可以做各种各样其他的这个复杂的操作啊,那所以这里边这个red map指的就是要明确的定义出来,我要往red里边写数据的时候,调用的命令是什么,然后你要写的那个数据又是什么,对吧?啊,所以这就是我们想实现的这两个东西啊,所以我在这儿就单独的去做一个实现吧,比方说前面我这个就叫做一个一个config,对吧。
06:14
那前面我来写一下啊,当前我要去定义。呃,JA连接配置,呃,那这里面我需要去new的是一个,对前面大家看到这里边要的是那个叫做flink jeice con base对吧,但是这里边大家点进去之后会发现它是一个抽象类。那他有没有具体实现呢?诶,这里边我们可以看一眼啊,这里边哎,大家看有一个这个flink jeice po con连接池的一个配置项,它是不是就直接继承自这个gene con base啊,那所以这里边我们直接用它是不是就完事了,所以这个也是比较简单的啊,这里边我直接写一个new,一个flink jeice po config对吧,我把这个引入,然后这个jeice po conig啊点进去大家看一眼,它本身的构造方法又比较特殊。
07:15
这稍微有点绕啊,它的构造方法是一个private的私有的构造方法,那这怎么办呢?这没没办法直接拗,对不对,那怎么办呀?哎,它对它是不是一定后边是有对应的那个调用构造方法,生成对应对象的那个方式啊,所以大家看下边它是不是有一个内部类叫builder啊,哎,所以这个builder里边正常来讲大家看有各种各样的参数,Host port对吧,Timeout,什么database password啊,都是可以去做配置的,然后里边会有一个,当然它可以有各种set方法对吧?啊,那最后怎么样去生成对应的那个,呃,当前的那个配置项的那个对对象实例呢,是不是有一个build的方法呀,这就是一个公开的可以返回一个flink je po f的一个方法,所以我们当前要调的其实就是它的builder下边的build,对吧,其实就是这个过程。
08:09
啊,那我把这个先定义出来啊,这个就叫一个conig对吧,然后这里边中间是不是还需要加入一些东西啊,对吧,你光这么写没用啊,我接下来是不是要set一些东西啊,这里边最关键的其实就是set host,当前我这个是local host,大家按自己的主机名来写,另外还有port,这个很重要,对吧?呃,这个给一个int类型啊,6379嘛啊,那当然这里边这个database我也不用设啊,用用那个默认就完完事了,对吧?直接把它定义出来就完事,这就是这个con定义。然后后面还有一个red map的话,可能这里边稍微的会麻烦一点点,对吧,所以这里边我new一个。Red red map,然后大家看这个red map里边需要什么东西呢?如果大家直接这么这么实现,觉得这个有点太复杂的话,我也可以稍微就是写写的分开一点啊,我这里自定义一个my red map,对吧,我不要用那个匿名类的方式,下边我再去写对吧,Public static my red map。
09:23
然后这里边诶写错了是吗。Red my red map对吧?Class没写啊。下边诶,下边后边这里边要直接去实现一个red map,对,这是一个接口,然后这个接口里边大家看有一个数据泛型啊,那这个类型当然就是当前要处理的数据类型,对不对,我当前处理的是不是都是s reading啊啊,所以我先把这个传进来啊。Sensor reading这个一写上面这个类型就不报错了,对吧?正确了啊,然后我们看一下它必须要实验的方法有三个。
10:07
啊,这里边接口里面的方法啊,一个叫做get command description,它返回的是一个red command description,从这个名字大家就看出来了,它是一个什么。操作red的一个命令的描述对不对,哎,所以那就是你要描述我这个操作red到底是一个什么操作对不对啊,到到底是发一个什么指令过去啊,那所以大家要想一下了,这里边我到底要发什么指令过来呢?这里边我们要定义自定义对吧,自定义。Red map,所以这里边的第一步其实我是要看啊,定义保存数据到red的命令,来大家想一下我现在保存成什么样呢?现在这个数据当然有最简单的操作,就是说来一个数据,直接直接set一个一个值就完了,对吧,然后我随便给一个K啊,然后直接set一个值就完了,但这样不好,对吧,到时候你这个像日志一样乱的对吧?啊,所以接下来我们其实大家能想到我是不是直接可以按照当前的这个。
11:19
因为现在是温度值嘛,那我是不是就是来一个sensor reading啊,对应的那个数据,我就把它的ID提出来,当前的温度值提出来,一个ID一个温度值,一个ID一个温度值,然后诶时间戳我不保存了,怎么样呢?来了新的那个温度数据的时候,我是不是直接更新这个数据就可以啊,诶那家想这是不是我直接写到里边去之后,我在另外有一个应用程序把那个release的数据读出来,是不是就相当于一个实时更新的监控信息啊。哎,所以这个其实在实际应用的时候很很常见,对吧?哎,所以那这个保存我应该怎么保存一个ID,一个一个对应的温度值,这个怎么这应该是个什么东西。
12:04
哎,这其实啊,当然大家可以把它放在一个set里边,但是这个一个ID1温度值的这种方式是不是更像一个map呀,哎,更像一个哈希map对吧?在re里边管这个叫哈希对吧?哎,所以接下来我们就直接把它保存成一个哈希就可以了,所以存成。哈希表,那所以这里边这个表是不是也得有一个表的名字啊,哎,所以我们存这个哈希表的时候应该是呃什么操作是不是h set呀,对吧,H set,然后另外呃另外我们应该有一个表名,当前这个呃这个表名的话,我们干脆就直接叫做这个这个呃叫做sensor temp吧,对吧。然后我我加个下划线啊,叫做sensor temp,然后后边是不是对应的就是当前的ID和那个temperature啊,对吧,这就是key value对吧?啊所以当前我这个return的时候,那就要new一个red command description,然后里边啊,大家看这个里边要指定什么呢?是不是就要指定一个red command呀?啊所以这里边啊,呃,这这里面给大家看一眼,这个red到底是啥啊,Red command是一个枚举列型。
13:26
然后这个枚举类型里边,你看哎是不是各种l push r push对吧,ADD对吧,Set啊这个这个publish对吧,The ADD对吧,各种这个操作就来了,所以接下来我这里边是不是要给一个red command.set呀,啊,它直接都放在这儿了是吧?然后接下来另外我这个h set大家注意是不是还得给这个对应的那个表名啊,它是得得对应的有那个追加的那个表名的,它不像平常的一个set,是不是直接直接有set就够了,后边再指指定k value6就完事,对吧,那现在这个表名我给一个这个叫sensor temp,好,这就是基本的这个命令。
14:08
然后接下来下一个大家一看就明明白了,这是get key from data,这是不是你后面要保存的时候,后面这个key是啥呀?那最后一个是get value,那是不是就是最后那个value是啥呀?呃,这个非常简单,Get key是不是直接return一个当前的data.get ID是不是就完事了?那这里边的value就是data.get temperature,注意要返回的是一个string对吧?啊,因为red里边是不是底层都是按照这个string来保存的呀?啊,所以都是按照字符来保存的,做一个to string这就完事了。这就是我们整个向这个red里边写入的,写入数据的这样一个过程啊,啊,那所以接下来我们可以执行一下看看效果,那要测试的话,需要先去启动一个red,好,我把它写起来,然后同样把这个red的客户端打开看一下,我先看一眼当前状态啊啥都没有好,那接下来我们运行一下代码。
15:12
看看效果怎么样啊,能不能正常的写入进去。好,这边执行执行起来。来看这边已经直接就执行完毕,直接退出了,对吧?呃,因为我们这里边在控制台没有任何输出嘛,大家看现在之前我们的之前我们的那些代码有没有输出呢。大家想之前这个代码也有输出对吧,只不过这个输出是控制台对不对?所以大家看这个print,这其实就是我们的一个特殊的thinkk任务,现在只是把这个think任务直接用这个ADD think来做一个输出而已,不在控制台了,直接放到了其他的这个外部数据库啊,所以接下来我们看一下re里边有没有东西了呢?哎,有了对吧?Sensor temp要看里边的这个数据,那得h get对不对?哎,那当然这个如果h get sensor temp,这里边我们可以来一个SEN41。
16:12
大家看37.1这是什么数据?大家看我当前是按照啊,不是这个啊三数据我是按照顺序一个一个输入的,它这里边是不是就更新的是最后一个数据啊,因为我们数据都已经处理完了嘛,他是他当然是来一个处理一个一次一次更新的,但是我们这里面最终看到的是不是就是最后一个结果啊啊你如果要想看所有的数据的话,大家也知道是对h get2对吧,Sensor temp,大家看是不是所有的数据SENSOR12呃一六七十都放在里边,然后都是它的最最新当前最新的那个温度值啊。你如果要是外部有一个处理程序的话,读取里边的值就可以实时的看到当前的温度监控信息了啊,这就是一个写入到re里边比较常见的应用啊。
我来说两句