00:00
好,那接下来呢,我们说一下我们FCDC的这个代码啊,这边呢,我已经从文档当中把我们所需要的内容已经粘过来了,给大家说一下啊,那首先呢,这一块我们要做一个什么事啊,大家还记得。现在呢?我们已经完成了my circle。搭建,然后呢,产生b log,哎这边呢,我们这样啊。已经完成了,咱们买S的搭建,然后呢,可以产生我们的b log,刚才我们也测试了对不对,好,那接下来呢,我们要使用弗Li CC程序将数据写到哪卡夫卡,而卡夫卡呢,我们也已经搭建好了,所以我们接下来要写的是这个程序啊,弗Li CC呢,官网给我们提供了它的代码,我们可以拉取到这个blo,但是呢,你要写到卡不卡,这是你自己的事,因为人家可不管你把数据写到哪对不对,好,那我们首先要写一个工具类,把数据呢可以输出到咱们的这个。卡夫卡。OK吧啊,所以第一步呢,写个工具类,然后接下来我们写弗C代码就好了,呃,那这边呢,有个卡瓦的工具类,其实最核心的就是这行代码。
01:09
创建一个flink卡夫卡的一个producer OK吧,那这块通讯呢,我相信大家都非常了解了,因为都是有这个flink基础的,对吧?好,那这是第一个我们所写的一个工具类啊,那接下来呢,是我们核心的这个弗CC代码,我把它放在APP里面,因为我们未来要启动的这个代码哈,那我们看一下,首先创建这个流的执行环境,第二块呢,我把它注释掉了,因为这个是开启开泡的,那生态环境当中,虽然我注释掉了,但是这个代码我给它放在这儿,未来大家工作的时候,这个东西切记一定要加上我们这块呢,学习阶段我们就。不要这个了,对吧,因为他会给我们任务带来额外的压力啊,没必要对吧?好,那接下来是我们完成flink。CDC整个过程的一个创建啊,构建代码呢,在这儿。首先flink CDC呢,它通过因为点from source,然后呢,在这边传一个MYS,那这个source怎么来的呢?是通过flink CDC刚才我们导入的那个。
02:07
家暴。对吧,啊,导入那个价包,然后呢,这里边就有一个MYSO,你们可以看一下啊,它是属于我们这个CDC包的,OK吧。好,那这边呢,首先填主机名啊,那我们云。地址啊,用外网地址,因为我们刚好装在这个外网上的,对吧,如果不在外网上的这个机器,那么你只能打包到集群上去做测试,那用外网机器呢,有一个好处在于我们可以在本地做测试啊,端口号,然后呢是我们的数据库名称啊,这边呢是写表,如果说你不写任何东西的话,那么就代表所有的表你都要,然后用户名密码啊,你写自己的密码就好了,然后杰森解析格式,因为blog blog log它这个二进制,所以要解析,解析成杰森格式啊,那格式呢,在这块给大家看一下,它默认拉取进来的文件格式。之前我们也说了,它里边这个字段对不对,那这边呢,可以给大家看一下它的一个格式在这儿。
03:05
Before after south。Operator。然后传三个,如果不没有15,那就那我们这边提到了对吧?好,那这边呢,我们的格式用它啊,然后呢,这个点我要给大家说一下。它指的是options对吧,那这个options有几种呢?第一有这个,第2LATEST,还有这个specific,还有一个step,总共有这五种,那各有什么区别呢?首先第一个initial initial呢,它是这样子的。看一下上面的一个注释,它对于我们监控的数据库表而言呢,在第一次启动的时候,它先做一个初始化的。快照啊,然后接下来continue to read the latest blog,也就是说比如说我们这个数据这里边啊,假如说是I硅谷,刚才我们写的I硅谷对吧?啊,这个无所谓好,那比如说这里边已经有数据了,那我们想把这个数据先拉取过来,然后再你有新增或者购改,我再去读取最新的。
04:07
啊,那这就一定它第一个阶段,它又分两个阶段,第一个阶段呢,先读历史数据,第二个阶段再读我们的最新的从blog开始读,对吧?好,那除了这个之外,第二个呢叫list,那它是从最老的blog开始读,呃,第三个latest,它只读最新的这个数据,也就是说它跟initial区别在于它不做那一次初始化,这个是指定off,也就可以指定位置去消费,然后这个是指定时间戳,也从哪个时间点开始消费,诶就比较灵活,比较方便,对不对?呃,那未来呢,我们要想导全量数据,那肯定是一定是,那毋庸置疑,对吧?那现在我们等会儿呢,做一个测试,在本地做一个测试,所以呢,我们用这个latest就好了。对吧,等会儿呢,我们也可以用给大家做一个测试好吧。那这块呢,我们就构建好了我们的source啊,然后接下来把数据结构做一个转换,因为它默认的这个数据结构啊,呃,杰森套杰森未来的不好用,而且这里边有一些字段呢,我们其实没有什么用,看这些东西呢,可能没有用,对吧,像这个什么搜ID啊,还有这些东西对于我们来说,未来分析数据没有用,我们用到的就是这里面标红的before。
05:17
那他说before we闹怎么还要用,现在这个是闹,那是因为我们是新增的一条数据,那我们之前提到了,如果是更新数据呢,那before有没有用,有用对吧,特别还有删除数据呢,有没有用有用对吧?所以before呢,保留after保留啊嗯,S里边取时间DB table啊,那这两个时间呢,嗯,可以取一个,因为你看这两个时间怎么样一样,对吧,取一个就好了啊呃,这操作。增删改对吧?啊,时间戳transaction,那我们也不需要,比如我们数据分析的时候,并不需要这个什么事物的ID对吧?啊,那这些东西呢,我们只保这些内容,所以接下来这一块呢,就对这个数据呢做一个加工,好吧,啊,没有什么太多可说的,就对于这个阶层数据做一个加工,最后呢,我们只保留这几个数据就好了。
06:03
OK吧啊,这样未来呢,我们好处理啊,Data呢就after old呢,就是我们的before数据好吧。那最后呢,我们通过这个卡法的工具类,将数据呢,写到我们的卡夫卡里边啊,是一个卡巴生产者,呃,那那这个时候呢。我们未来写到这个topic DB里边,好吧。那现在呢,我们不着急写,我们在本地做一个测试啊。就是map。点print对吧。我们做一个简单的测试来看一下,呃,那这边呢,我已经写成了自己的公网地址了,那如果说你要做本地测试,你用这个,如果说你装的不是在这个有公网地址的服务器上面,那么你只能打包到集群上去做测试,那等会儿呢,我也会给大家说这个怎么去做测试,好吧。呃,那本地呢,我们把它运行起来。注意啊,我现在用的是这个latest对吧,最新的啊,尺度最新的,看它直接连接到了我们的blog这个位置叫1856来看1856是什么意思呢?呃,还记得这边进到哇。
07:12
MY。1856最新的latest对吧?好,那接下来呢,我到这里边,比如说我把这条数据删掉,删除,我们看这边打印,那这样的数据格式呢,我们就更舒服一点,对吧,它是一个删除操作啊,那b before数据呢,我没有保留,那就算了啊呃,这边呢是表明啊,数据库呢只有一个,那就无所谓了,比如说我们新增一条数据给大家看一下啊。是吧?啊,再来一个爱的硅谷。保存。保存好以后呢,我们这个里边看到有这个data,诶,这是我们刚刚新增的数据。啊呃,那就说这个b four数据就不要了,那无所谓对吧,因为我们后面呢,其实对这个b four数据呢,并没有做任何的一个加工,所以我们就不需要了啊,我们只要有after就行了,但是生产环境当中呢,如果你需要这个before,那你就把这个before保留就好了。
08:09
OK吧,啊,这个其实是吧,这是新增,那如果说我们做一个修改给大家看一下啊呃,把这个艾特硅股呢,改成这个第一个字母首字母小写。诶,修改数据呢,我们保留着这个old啊,它可能需要的,因为对于删除数据而言啊,第一在生产环境当中呢,一般来说我们很少会有这种。删除的一个操作,其实在生产机当中,我们要想删除数据,顶多打一个标记,不会真正的去删除数据,对吧?啊,所以呢,其实删除数据不多啊,那我们这边保留呢,是因为我们自己做测试的时候,未来是有会产生这个删除数据的。懂吧啊,所以呢,我们就不要啊,直接就给它干掉,所以什么都没保留,主要是一个delete这个标记啊好,那这个增删改我们都看到了,这是没有问题的啊,那接下来呢,我们给大家演示一下,打包到集群上去运行啊,同时呢,我们把这个数据呢,上传到卡夫卡这个主题里边啊,那这个主题呢,你可以不必去创建,因为卡夫卡的生产者会自动给我们创建这个。
09:10
主题好吧,好,那这边呢,我们是弗林CDC代码的一个讲解与本地的一个测试。
我来说两句