00:00
接下来我们最后再讲一个啊,直接把数据写入MYSQL的一种方法啊,那这里边大家会想到是不是就必须得建立JDBC连接了呀,啊对吧,所以这个过程我们还是新建一个单立对象啊,这个因为我们是以MYSQL作为例子,那大家知道只要是JDBC连接的,其他的关系数据库是不是都一样啊,啊对吧,你如果是其他的数据库其实是一样的,所以我们这里边直接把这个叫做JD bc s test。好,还是把main函数间定义出来,呃,那大家会想到这个实现里边我需要引入什么依赖吗?哎,有同学说那这个不需要啊,你连官方的那个,呃,就是这个这个连接器都没有,你还实现什么呢。诶,但是大家会想到我要创建这些PC连接,你你能直接就是随便手写一条就就把这个相关的连接创建出来吗?我们可可能还是需要一些这个连接器,对吧?哎,所以这里边我们直接去引入一个,大家看这就不是flink跟MYSQ的连接器了,因为没有对不对啊,直接就是MYSQL发GDBC连接的一个连接器,Connector抓va啊,我们把这个引入。
01:29
好,那同样这里面的代码大家会想到前面的部分非常的类似,我还是把这一部分copy过来。前面创建这个创建执行环境,设置并行度是一,然后我们把这个数据从文件里边读取出来,接下来做一个map,把它转换成s ready对吧?好,呃,那同样接下来我们做的这个S操作,是不是还是data stream要去ADD一个S啊,那接下来这里边的这个ADD think,这真的没有什么好方法啊,是不是必须得去自定义你有一个自己的啦啊,所以这里边定一个my gd bc think。
02:17
方式啊,那等下我们要自己去实现了,另外大家不要忘记Env.execute。Think test啊,这是我们的程序的一个主体流程,那接接下来我们其实就是要把这个my j dbc think,这个think function真正的实现就好了,那大家看一下怎么实现啊。Class my gdbc think啊,那它要继承一个什么呢?实现一个什么接口呢?哎,这里边我们给大家一个例子,就是我可以去实现一个think function式,对吧?但是大家会想到think方式里边能做的事情是不是比较少啊,因为我们在这里边可能还涉及到干什么呢?是不是涉及到跟MYSQL那边要建立连接啊,那那你说我们这个think的时候,它是来一条数据,是不是调一次我们这里边的这个方法,你难道是来一条数据我就去建立一个连接吗?重新建立一个连接吗?显然不应该,对不对?我们是不是应该把它统一放在。
03:23
哎,对,是不是应该在初始化的过程当中把连接建立出来,后面我就直接去,呃,调用这个连接去直接那个写Co就可以了呀,执行Co就可以了,所以我们的一个想法,那怎么样去继承一个什么东西呢?对,大家会想到什么函数类可以有这个open这样的一个初始化的过程呢?对,生命周期在rich函数里边有,所以我们这里实现的是一个reach think方式,好,那当然这里边我们输入的类型还是sensor reading对吧?大看这个就是大家看它连那个一定要实现的东西都都没了,呃,为什么呢?因为你是think嘛,对吧?呃,对,本身你就是往外面去写,所以他并没有强制说你一定要去实现什么东西,好,那既然我们在这里边要去初始化的时候建立连接,那我们先把这个定义出来吧,对吧,定义。
04:23
这个CQ的连接还还要定义一些什么东西呢?我还准备定义一些,比方说我我们在再往里边写的时候,是不是我可以预先把那个Co先写好,最后再往里边填对应的那个数啊,来一条数据去填对应的数对不对?如果我预先把那个C口写好的话,就相当于提前就可以做预编译,然后就可以生成对应的那个执行计划了,那后续我在执行的过程当中就会非踌对吧?啊,这个大家是知道的,所以我们定义这个预编译器。
05:08
啊,所以当然他们后面就这里面只是先把它定义出来,我们具体赋值是要在那个open的时候才赋值,对吧,这里面定义一个connection,它应该是。啊,这个我们它是一个connection类型。呃,这个我们直接选这个Java CQ就可以啊,然后我们可能还要有两个预编译的语句,比方说有一个插入的预编译语句insert,我们要往那个,呃,MYSQL里边插入一条数据的时候是一个语句,另外是不是还有可能是更新啊,呃,大家想到就是我如果比方说我里边存的的还就像我们red里边存的数据一样,因为我不能像ES那样把所有的数据都往里塞,对吧?MYSQL这个还是比较重的一个操作,所以说我预计是还是把这个对应的3ID和它对应的那个温度值写进去就好了,那么随着时间的变化,那个温度值是不是可以不停的更新啊啊,所以还会有update,对吧?有insert,有update,那我这里定义一个inser statement啊,一个预编译语句。
06:21
它应该是prepared,这个大家用过吧,Prepared statement对吧?一编译语句。同样还会有update statement prepared statement,好,先把它定义出来。接下来接下来是不是就要有这个open生命周期了,对吧?呃,初始化过程。创建连接和预编译语句啊,所以这里边大家就会想到我肯定是可以去复写某个方法对不对,诶,这里边有这个open对吧?啊所以诶,这里边大家会看到我其实还是必须要去实现一个VO invo方法的,对不对?哎,但是大家会看到这是这个think方式需要去实现的一个东西,所以我先把这个open先写入。
07:29
好,然后呃,大家看你可以把这个S写进来对不对啊,就把它这个呃,负负函数里边的负类里边的那些实现直接调用一下,这里边我们的connection是不是就可以给他一个赋值了啊,这里边我们创建连接啊,用这个driver manager。是不是可以get connection啊,这里边直接传一个JDBC连接,比方说我这里边是JDBCMYCQL。
08:02
啊,这里边logo host默认端口3306。呃,我要连接的数据库,比方说就叫test吧,对吧,这个是可以的,然后后面还可以跟上那个这个username和password,我这里边直接写死吧,但是这个不安全啊,大家实际生产应用肯定不会这么写,Root,我的密码123456,好,这是我的创建的连接,然后接接下来是我们的那个预编译语句,对吧?Insert statement,那它是不是要调用这个连接里边的prepare statement呀?啊,那这里边我们就要写QL了啊,这里边的这个CQL怎么写呢?Insert in into对吧?这个表名,表名我们叫就叫temperature吧。Temperatures,好,然后接下来这里面字段,我们定义两个字段,一个就是Sen ID对吧,另外一个就是它temperature,啊,这个我们简单一点就是sens和temp吧,就表示当前它的这个ID和对应的那个温度值。
09:16
啊,Insert into这个,然后大家会想到是不是还得有values啊,Values现在大家知道吗?现在还不知道对不对,所以对我是不是预编译语句里边可以有这个占位符啊,对吧,用问号作为占位符,等一下有数据来的时候,是不是往这里面填充就可以了啊,然后同样的啊,Update statement。Prepared statement,那这里边这个写的就是update temperatures,呃,后面是set对吧?Set我们要set什么呢?哦,如果要更新的话,是不是查询对应的那个S4ID,然后把它对应的那个temp temp要改变掉啊,诶所以这里边我们set set什么是不是tap要等于对,等于问号,然后对,然后查询要where。
10:21
是不是SS等于问号对吧?啊,这就是我们的这个预变语语句啊,先把它写好。然后接下来这就是我们要去执行了,那这个执行过程我们是不是得得去实现一下这个evoke呀,对吧?啊或者是大家这个。大家看上面这个空的这个invoke已经被被弃用了啊,所以我们选这个好,呃,Invoke的过程当中,我们就不要去加这个S了,我直接这里边调用连接去执行我们的那个CC口命令,对吧,调用连接执行CL。
11:05
那首先首先我们直接上来就执行更新语句,诶为什么我们直接上来执行更新语句呢,来了一个数据直接就更新呢。因为大家会想到这个更新语句是要先查where,这个sensor等于对应的那个ID的对不对,那假如说之前没有的话,是不是没查到就相当于更新没更新啊,然后我们就能判断你如果要是没更新的话,是不是就去插入一条啊,对吧,就相当于我们不用再去单独做一个select啊,这个where有没有了对吧?省了一条语句,所以我直接执行更新语句,那这里边就是update statement是不是先要set set对应的值啊,因为里边有占位符set什么呢?那大家会想到这里边来的数据,我是要给这个,首先是temp temp是不是一个double类型啊,温度是一个double类型,所以我要set double。
12:09
一对吧,这是它的那个位置,第一个占位符,然后我要塞,塞什么呢?Value里边的temperature啊,这个大家知道对吧?所以接下来同样我们还要塞第二个位置,第二个位置是它的是不是sensor ID啊,所以这里边要set的是string,对STRING2VALUE的ID把它塞进去。然后接下来是不是可以执行了execute对吧?啊,这就是我们这个执行更新语句的过程啊,那如果update没有查到数据,那么执行插入对吧?诶,所以这里边我们可以判断一下,怎么判断它没有更新呢?让大家知道这个右边语句里边它有一个方法叫get。
13:13
Get update count对吧?啊,就是看我们之前的这个到底修改更改了,更新了几条数据,那假如说它等于零的话,是不是没有查到数据啊,没有更新到数据,那么接下来我们就去insert对吧?呃,Insert set,呃,它的这个第一个是sensor,第二个是temple,所以是不是要先set string啊一。value.id然后后边set不是set double2value.temperature来看是不是这样,然后接下来去insert execu把它执行啊,所以其实就是这样的一个过程,这就是来一条数据,按照我们已经定义好的那个预编译语句,对吧?把对应的那个字段填充进去,执行这条语句就可以了,来一条数据执行一次,来一条数据执行一次。
14:13
啊,那最后大家会想到你有开就有关对吧,有open就有close,对,所以我们最后再实现一个close方法,这把连接和这个预编译语句是不是都得关掉啊,对吧,所以insert这个语句。Close update语句。Close啊,最后连接对close啊,这样做一些收尾工作就可以了,关关闭时做清理工作,哎,这就是我们去实现了一个rich think方式,然后在这个过程当中实现了到这个数据库的连接和写入啊,这就是我们这部分代码啊啊那接下来我们给大家还是做一下测试,那这个测试的过程当中,呃,这个就就得去起那个mexicoq了,对吧?我我本身这个机器上面就已经跑着有这个MySQL server啊呃,那就服务是本身起着的,那我现在就直接去连一下吧,MYSQL-u root对吧,杠P。
15:26
好,输入密码,我们先看一眼啊啊,然后看一下当前的那个database,来这个test,我们现在是要用test对吧,用test。看一下当前的这个表,诶,啥都没有,啥都没有的话,这这个就不像我们那个之前一看到啥都没有很开心对吧,省得删了你这个啥都没有能直接往里面插东西吧,对,连表都没有,那这个显然不行,对吧?所以这里面我们还麻烦一点啊,是不是先得create啊,Create table我们那名字叫什么来着,在哪哦,在在这这里对吧,叫temperatures,这个大家一定要记得啊,你说这里改的名字,这里肯定要写的不一样才行,好啊,那这里边我们可能有两个字段,一个叫sensor对吧?啊,它是一个string,我们八差20应该够了啊啊note纳。
16:27
然后还有一个ta,它应该是一个double类型对吧?脑存档好,我们直接把它创建出来,看一眼这个table啊啊,这个刚创建出来肯定里面没数,对不对啊,接下来我们就运行一下这个代码。呃,大家觉得这个。大家看到我们这里边程序已经跑完了,那正常的话我们应该把数据写入了,对不对?大家看一眼,这里边select sir对吧?诶大家看,果然这里边的数据是不是都已经写入了,写入进来之后的状态是什么呢?我们一共不是六条数据吗?现在写入进来对只有四条,为什么是四条呢?对,跟当时redis的状态一样,我们是不是假如说这个Sen ID是相同的话,我们是直接更新啊,Update操作对不对?哎,所以大家看到如果我们另外有一个有一个程序去读取这个MYSQL的内容,去实时展示的话,也可以给我们展示一个,呃,当前的一个监控信息,对不对?把这个大家可以把这个再去做一做,这就是我们今天的这一部分内容。
我来说两句