NIFI从MySql中增量同步数据_通过Mysql的binlog功能_实时同步mysql数据_根据binlog实现update数据实时同步_实际操作05---大数据之Nifi工作笔记0044
具体的,之前已经写过,如何在NIFI中实现MySQL的增量数据同步,但是写的简单了,因为,比如在插入的时候,更新的时候,仅仅是写死的某个表,也就是针对某个表,指定好字段进行插入操作,或者更新操作,这样就有些局限了,比如我想同步一整个库,注意是增量同步,那么,难道我要一张一张表的去创建好了以后,然后我再去一个的同步嘛,比较麻烦,一点点解决.
首先看这个日期同步的问题,这里:
编辑
首先先来看一下之前那个整体的,mysql到mysql增量同步的流程,可以看到这里我们主要修改insert的ReplaceText处理器就可以, 修改这里拼接sql语句的部分.
编辑
上面这个insert对应的ReplaceText处理器需要修改.
编辑
需要修改他的ReplacementValue这个属性
INSERT INTO userinfo_nifi(id,name,mobile,email,son_json,cdate) VALUES($,"$","$","$","$",STR_TO_DATE("$",'%a %b %d %H:%i:%S CST %Y'))
主要是这里的内容,可以看到有一列是日期类型的,那么,需要通过STR_TO_DATE
INSERT INTO userinfo_nifi(id,name,mobile,email,son_json,cdate) VALUES($,"$","$","$","$",STR_TO_DATE("$",'%a %b %d %H:%i:%S CST %Y'))去进行转换.
类似于:
INSERT INTO `userinfo_nifi` VALUES (9, 'sdsd', '4645', '665', 'sdfs', STR_TO_DATE('Fri Jun 16 04:40:30 CST 2023', '%a %b %d %H:%i:%S CST %Y'));
将日期格式转换可以插入到数据库中的就可以了.
当然还需要在EvaluateJsonPath处理器中添加上,cdate
编辑
然后再来看更新:update
这里也是修改一下EvaluateJsonPath处理器和ReplaceText处理器就可以了
update userinfo_nifi set id = "$" , name = "$" , email = "$" , mobile = "$" , son_json = "$" cdate = STR_TO_DATE("$",'%a %b %d %H:%i:%S CST %Y') where id ="$"
这个是ReplaceText的ReplacementValue值.
这里其实还有个问题,就是空值的时候呢,已经试过了,空值的时候上面的办法也会报错...所以
再继续解决:
先来看一下整体的解决后的流程:
编辑
编辑
然后我们看看修改的部分,注意这个流程,仅仅是用来同步一张表的,配置也麻烦,需要配置表名,表字段什么的,后面咱们看看,能不能改成自动识别字段这样.
先来看insert部分,他的这个EvaluateJsonPath处理器
编辑
先来提取字段内容,然后
再添加一个updateAttribute处理器,用来更新一下cdate字段,日期类型的字段需要处理一下,
就是判断它是否是空,如果是空就返回null,不是null就返回它自身,当然,其他的字段也可以用这种方法来处理空值.
编辑
updateAttribute处理器这里的,需要添加一个字段,这个字段就是处理后的cdate字段.
$','%a %b %d %H:%i:%S CST %Y')")}
可以看到其实就是处理cdate字段的空值的.
然后再去:
编辑 看这个ReplaceText处理器
这个是ReplacementValue的值:
INSERT INTO userinfo_nifi(id,name,mobile,email,son_json,cdate) VALUES($,"$","$","$","$",$)
这样就可以了,下面的部分都不用动,这样就可以适应null的情况,如果不是空值就可以正常插入数据了.
然后再来看update更新:
更新也是从EvaluateJsonPath处理器开始提取字段值
编辑
可以看到提取的值有点多,因为有新值和旧值
编辑
提取以后
编辑
让再来看下面的updateAttribute处理器,这里主要添加两个属性,用来修改cdate的值,判断是否为null处理
$','%a %b %d %H:%i:%S CST %Y')")}
$','%a %b %d %H:%i:%S CST %Y')")}
这两个值的内容分别为上面的内容
然后再来看:
编辑
ReplaceText处理器的值
update userinfo_nifi set id = "$" , name = "$" , email = "$" , mobile = "$" , son_json = "$", cdate = $ where id ="$"
这样就可以了,然后就是去测试,这样,新增,删除,修改,包括 对日期类型数据的空值判断,插入都可以了.
领取专属 10元无门槛券
私享最新 技术干货