00:00
那接下来我们最后再讲一个,就是像。Red,呃,前面我们讲过了,Red ES还有卡卡里面去写入数据啊,那我们就想一下,如果说我现在要直接把数据写入到MYSQL怎么办呢?MYSQ的话,大家会发现,呃,就是首先是我们在实际生产当中还是经常会有这样的需求的,因为往往这个生产环境里边的业务数据库本身就是MYSQL,那最终我们统计出来的结果有可能也要用一些业务逻辑去访问,对吧?哎,所以接下来我们想把它再重新写回到MYSQL,这是很正常的一个需求,但是我们发现在一点十当前一点十版本里边并没有相关的依赖支持,对不对,也就是对应的这个连接器是,呃,这个MYSQL啊,是没有对应的连接器支持的啊,当然就是现在这个最新的01:11其实是已经有MYSQL的连接器了啊,但是一点十之前还是没有的,所以当前我们没办法直接用,而且呢,它不光是这个官方连接器没有,就像那个第三方的那个框架啊,把here他也没有给我们提供,那怎么办呢,现在。
01:06
啊,那就只好自己写了自定义,因为so think function的那个接口是不是已经放在这里了啊,所以接下来我们就自定义一下就完事了,那这里面大家看到我还需要再引入一个依赖。大家会看到,诶,这里面是一个连接器,对吧?MySQL connector Java,那那这个连接器已经有了,那那为什么我还要自定义实现呢?大家注意这个并不是flink的连接器,对吧?哎,这是你既然要往那个MYSQL里面写数嘛,你是不是至少得有一个客户端得连上MYSQL啊,哎,对,所以说这个其实是这个MYSQ,那个driver啊,那些那些提供的那些连接工具,所以这里边我用的这个是5.1的这个版本啊,接下来我们还是在po文件里边先把这个做一个引入。好依赖先引入。接下来。我还是先刷新一下啊。
02:00
然后接下来我们在think下边去新建一个测试的测试的类。你有一个Java class,当前是think test,呃四那这个其实就不一定非得是MYSQ了,大家知道只要是JDBC连接都一样,对吧,这个MY是比较有代表性的啊,关型数据库,呃,那所以现在我们把这个都已经引入,然后接下来。前面的流程跟前面这个ES这里边还是一样的,对吧?呃,这个创建执行环境,从文件读取数据转换成sensor reading类型,甚至就这里啊,大家如果要是想copy的话,后面完全copy过来都可以。后面我们这里边还有这a think对不对,连同这个发括号啊,我直接copy过来啊,那当然了,这里面我就不要把ES相关的那些东西引入了,这里面我并不是要去new一个ES的think方式,所以我把这个都删掉。
03:03
现在我要拗的是一个自定义的像MYSQL里边写入数据的think方式啊,所以那这个就这个整体,这个代码反而更简单了,啥都不用写,是不是直接自定义啊,啊,比方说我这个叫my JD bc think这样的一个方式,对吧?啊,那接下来我们在下边实现实现自定义的think方式。所以我们写public static class啊,然后接下来my JD bc think,好,接下来我们要实现的是一个think function,对吧,大家会想到我是直接implement一个think function,这样就可以了,然后里边的数据类型,那还是本身数据是什么就是什么sensor readingding吧,直接放在这。这是没毛病的,但是大家想一想,里边我要做这个具体实现的过程当中,是不是要跟red,呃,那个MYSQL那边去做连接啊,做连接的话,这里边think方大家知道有一个process,这个invoke方法嘛,对吧,这个invoke方法它是什么时候调用呢?
04:15
这个方法应该是。哎,它应该是每来一一个数据,是不是就会调用一次啊,哎,所以这个每来一个数据就会调用一次,这里面就有一个问题,那我如果要是创建这个red,呃,到MYSQL的连接的话,GDBC连接的话,每年一个数据都都创建一次吗?这就不好了,对吧?哎,所以接下来我们就想到是不是可以有生命周期啊对,在open里边去创建到MYSQL的连接,哎,然后我们每来一条数据在in去做操作,这不就很合适吗?啊,所以接下来我们就想到要生命周期,那生命周期方法现在并没有啊对,大家想是不是得reach function啊,所以是rich think function。
05:00
好,我把这个先直接引入对吧,然后接下来那这个reach think方式大家就知道了,不能implement,而必须要对extend,因为它是一个抽象类啊,所以这里边我就必须要实现这样的一个invoke方法,呃,那在这之前呢,我还应该去。实现一个重写一个open生命周期方法,在open生命周期里边,我们是不是主要是要创建一个连接啊,然后这里面又有一个问题是我在这如果要是创建一个连接的话,那比方说我定义这么一个一个对吧,大家看这个我可以直接用这个Java CQ里面的这个connection去创建一个connection对吧?哎,那那我在这儿去创建的话,那我在后面能用吗?能用这个connection吗?对,这是不同的方法呀,那这怎么办?哎,对,应该把它是不是声明成属性啊,对,所以在外边直接声明出来啊,所以这里边我去定义一个connection,在外边先定义成档,然后是不是在这儿才去做一个做一个赋值啊,对吧?啊,那所以这里边connection我直接用这个啊。
06:11
Driver manager对吧,直接driver manager,然后我去直接get connection,然后里边,诶这个就是我这个URL对吧,JDBC,然后呃冒号,然后MYCQ对吧,我们是连接到MYQ,然后这边叫log,诶斜杠啊。Local host3306默认端口对吧,然后后边比方说我默认连接的这个数据库就叫test啊,这就是我当前的这个urll对吧,连接方式,然后本身这个get connection呢,不光可以只传一个这个URL,也可以传property,另外还可以大家看是不是还可以直接传这个user和password呀,我这个就简单一点,因为我这有这个密码啊,我就直接。
07:00
后边传啊,当前user,呃,Username root,然后密码123456啊,当然这个从安全性上来讲,这样写的不太合适啊,就是给大家做测试,我就直接把这个就放在这儿了,好,然后接下来呢,那自然就是就是下面就可以创建连接去做操作了,对吧?哎,但是在这个过程当中,大家其实会发现每一个数据来了之后啊。假如说我要去做一个这个,呃,我我直接要往里面去插入一条数据的话。大家会想每一个数据来了之后,其实插入的这个操作是不是都差不多啊,都是色的INTO1张表里边对吧?然后比方说我现在比,呃,我现在保存什么字段呢?呃,我就想保存这个当前的ID和对应的那个温度值,就像之前我们那个那个做法一样,对吧?类似于一个key一个value那样啊,然后去实时的更新,来了新值之后就去更新之前的那个值,那大家想那你所有的数据来了之后,不就是要不是一个插入,要不是一个更新操作吗?
08:03
这个操作你每来了一次之后,再去单独写一个CQ,然后单独去,呃,生成那个执行计划,单独去执行,这个效率是不是有点低啊,可以怎么样。我们是不是直接可以一开始就定义一个预编译语句,然后它相相当于是不是我们这里就生成那个执行计划了,后边是不是来了数据,只要往里边填数就行了,直接一跑就完事了,对吧?哎,这个其实是对于这个MYSQ啊,JDBC操作的一个基本的优化啊,所以这里边我们可以声明一些预编译,预编译器啊,所以这里边就是,呃,这个就是prepare对吧,Prepare statement啊,定义一个这个这里面可能涉及到两个,一个是insert sment,首先还是啊,在这儿定义成now。然后另外还有一个是update对吧,Update statement也是先定义成道,然后诶这里大家看啊,就是这里边声明出来啊,声明啊连接和预编译语句语句。
09:15
啊,那这里边是不是我就可以把这个编译语句直接创建出来了啊,Insert statement啊,那这里边同样我是用要用这个connection啊connection.prepare statement对吧?呃,这个就相当于我直接创建出执行计划了,里边其实就是一条CQ了啊那这里边应该怎么写?刚才我应该是insert in into,诶,这里边就涉及到一个,你到底要那张表叫什么名对吧?我还是跟那个red里边叫一样吧,我叫sensor temp,然后里边要写什么字段,那应该是我要ID和temp对吧?Temperature,那另外是不是values啊,对,Values values是啥呢?哎,Values现在是不是根本不知道啊,所以我现在应该是问号,问号对吧?占位符对不对啊,我现在根本不知道它是什么什么情况啊,所以我可以直接这样把它定义好,然后接下来update s也是一样,Prepare放在这,然后接下来这里边应该就是update sensor temp啊,当然就是这个表名,你也可以作为参数传进来,对吧,我这里边就就直接写死了啊,Update一下啊,然后接下来是得set当前的temp。
10:37
等于等于什么呢?哎,这是问号他不知道对吧,你传进来之后我才知道嘛,然后是where查询对吧?ID等于问号,这是不是就是我们要执行的这个预边语句啊,先把它写好,然后接下来VO,那就是是不是每来一条数据,那我们是不是就调用连接执行CQ啊,调用连接执行CQ啊,那所以这里边其实执行也非常简单,就是只要把那个预编语句里边对应的那个占位符塞上东西了,执行就完事了,对吧?啊,所以执行的我们也不用单独去写那个CQ啊,直接执行这个语言语句就就就完事了,那这里边首先我们想到的是执行,对到底是执行更新还是执行插入呢?
11:30
哎,我们这里的想法是可以,哎,大家想到这个,我把这个放在上面啊。就是这个本身invoke要做的事情就是这个,那我们具体来做的其实就是直接执行更新语句。更新语句,那大家想,那是不是如果要是没有更新成功的话,那是不是就应该要插入啊,对吧,所以就是能更新成功的话,就直接更新成功对吧,如果没有更新,那么就插入。
12:03
好,所以接下来我直接非常简单啊,先把这个update state statement啊先做一个填写,那我接下来是不是要先填它的那个字段,第一个是什么呢?Update。大家注意第一个是不是应该是那个temp呀,所以是set。Set double对吧,Set double,然后这里边给的位置是不是应该是一,然后后边的这个值就应该是value.get temperature对吧?然后同样后边那个是ID,那是不是set string啊,然后后边我们这个是二,然后value.get ID,接下来update是不是直接做一个execute直接执行啊好啊,那对应的我们接下来是不是判断它是不是到底有没有更新成功啊啊,那这个可以直接用预编译语句里边它有一个方法叫做大家看啊,叫做get update count。
13:01
就是你到底更新了多少条对吧,那大家知道如果说它等于零的话,是不是就是没更新啊,哎,所以呢,接下来是不是就插入啊,所以接下来是insert这个预边语句要去set啊,大家想现在是不是就反过来了,它就是ID和那个stream,呃,和那个double类型的temperature嘛,所以这里边我们要的是1value.get ID后边是insert statementt set double,二接下来是value.get temperature,最后insert做一个执行,是不是这样啊,所以这个其实就是。就是非常简单的一个过程,我们就直接把它搞定了,对吧?啊,然后接下来,哎,这还没完,最后是不是我还应该你得有始有终,有头有尾,对最后是不是还应该再做一个close啊,啊,那在close里边这个非常简单,是不是就是预编语句可以可以清掉了,对吧?呃,这个执行计划都不需要了啊,Close掉,最后connection close连接关闭,我们完整的操作就做完了。
14:12
啊,这就是整个处理的流程。啊,那接下来我们还是在代码里边这个完整的来测试一下啊,看看这个效果怎么样,那当然这个要测试的话,前提也是先得去连接到MYSQL了,对吧,我当前这个机器是默认就就启着MYSQ的,所以我直接先连接上吧,杠P对吧。123456,我把这个进去,然后现在啊,我我要去连接的这个test,大家大家还记得当时我指定了那个数据库的对吧?好,现在我收一下tables。哦,这里边啊有一堆表,但是其实没有我想要的那个表,对吧?那大家想这个没有这张表的话,是不是我还得先新建一下这张表啊,对吧?所以是create table sensor temp,呃,Sensor temp来,那接下来还得有对应的字段对吧?这里边我那个叫DD,这个是叉。
15:15
呃,我随便给一个吧,比方说20差不多了对吧,Not now,然后另外呃,这个temp double not now,对吧?哎,直接给一个这个定义,把定义出来show一下tables现在有了对吧?那当然现在肯定里边啥东西都没有,刚创建出来嘛,接下来我们就来执行一下代码,看看最终能不能把它写入到这个里面去。好,大家看到,诶,这边我们已经完整的把这个数据写完了啊,已经写进去了,现在我们看一下select芯对吧,From sensor temp,诶大家看现在是不是就这个数据都已经写进去了,而且都是最后一个数据,最新的数据对不对?诶这就是跟我们之前那个red里边写入的那个状态是一样的啊,然后如果说大家还想测一些更加有意思的场景的话,还可以怎么干呢?哎,就是大家可以把这个源做一个调整,我们现在直接从那个测试文件里边读取的数据源,对吧,我可以直接把这个注掉啊。
16:21
我直接怎么样呢?我定义一个这个,呃,就是env直接ADD source,大家还记得我们之前有过一个自定义的那个模拟测试数据源吗?对吧?诶,我们之前是有过那个my sensor source的,然后我把它定义出来这个叫data stream,对吧。Data stream把这个定义好,然后大家会想到这里边直接我去ADD ADD think,直接把它添加进去之后,那是不是接下来是一个什么样子,是不是它会不停的更新啊,里面的数据就按照我们定义的那个十个哎,不同的传感器,然后那个当前的那个传感器温度值是不停的在跳动,不停的在更新,对吧?好,那我现在再给大家测一下,看看这个效果怎么样。
17:13
大家看现在已经跑起来了,跑起来其实正常来讲是不是这边就应该在不停的输出啊,就应该有东西了,对吧,我们看一眼啊,哎,大家看这就是当前所有呃,就是十个传感器我们的那个温度值,因为我们生成的那个double类型的数据随机生成,没有没有截取对吧?所以这里边看的有点奇怪啊,但其实还是一样的,然后接下来大家看我快速的一次一次,这就相当于我写了一个脚本做轮询,对不对,大家看这里边的这个温度值是不是在之前那个温度值基础上在不停的变变化呀。对吧,三四十一,你看大概就是40度左右,对吧,有时候大有时候小啊啊,那后边这个三四二十大概是80度左右的样子啊,79度多啊,所以它都在不停的变化,这就是比较符合我们真实的这种应用场景,你如果要是现在真的是连接到一个,呃,真实生产环境里边。
18:06
卡夫卡那边去消费它的实时数据,然后写入到最后的结果,写入到这张表里的话,另外我们用一个用一个程序去查询轮讯,查询这张表结果直接比方说在一个大屏幕上显示出来,大家想这是不是就是一个实时的监控信息展示啊,所以这就是一个非常简单直观的应用场景。
我来说两句