前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用kettle来根据时间戳或者批次号来批量导入数据,达到增量的效果。

使用kettle来根据时间戳或者批次号来批量导入数据,达到增量的效果。

作者头像
别先生
发布2018-12-12 16:10:22
3.2K0
发布2018-12-12 16:10:22
举报
文章被收录于专栏:别先生

1、Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。下载图形化界面的zip包格式的,直接解压缩使用即可。安装部署模式这里不说了,自己可以根据自己的需求安装为单机模式或者集群模式。     Kettle的社区官网:https://community.hitachivantara.com/docs/DOC-1009855       Kettle的下载地址:https://sourceforge.net/projects/pentaho/files/Data%20Integration/ kettle国内镜像下载:http://mirror.bit.edu.cn/pentaho/Data%20Integration/ 2、由于这里只是演示了如何配置通过时间戳和批次号增量的导入数据,所以具体的操作不再叙述,具体的使用自己可以根据需求来使用。

Job如下所示:

代码语言:javascript
复制
思路。
批次量将一批数据从一个数据库导入到另外一个数据库,而且每批次的数据量不能重复。
这里使用时间戳,你也可以使用批次号。原理基本一样,都是确定每一批次的数据量。
job步骤:
    第一步。start,可以设置定时或者手动点击启动job。
    第二步。执行转换。
    第三步。将start_time=next_time。
    第四步。成功。        
        
1、Start,类型可以选择不需要定时,时间间隔,天,周,月。
默认不需要定时,如果需要定时的话,首先把重复的框勾选。
然后如果选择时间间隔的话,可以输入以分钟计算的间隔或者以秒计算的间隔。
如果按天,就选择天,然后选择每天几天的几分开始跑。
如果按照周,就选择每周的每天几点几分开始跑job。
如果是每月的话,就选择那一月的每天几点几分跑job。
        
2、转换的作业项名称,自己填自己的作业项名称,
在转换设置的tab然后自己填自己的转换文件名core_table_name_down。
高级tab,设置日志tab,位置参数tab,
命名参数tab,如果自己需要的话可以自己使用和研究。
        
3、作业项名称,自己填自己的,数据库连接,自己新建和编辑即可。
SQL脚本,自己填上自己的sql脚本。
这个主要是批次量导入数据,所以使用时间戳来实现批次量导入数据。
所以每次批次量导入数据结束,将start_time=next_time。这样下次
执行这个job,就是下一批的数据量了。
update 数据表名称 set start_time=next_time where table_name='数据表名称' and part=第几步

转换如下所示:

代码语言:javascript
复制
注意:
1)、由于是将上一步查询的值插入到下一步?的地方,所以一定要注意。
将带有?的步骤,替换SQL语句里面的变量,进行勾选。
从步骤插入数据,进行选择上一步的名称。
2)、步骤名称,自己起自己的名字。
    数据库连接,自己新建和编辑。
    目标模式,如果是Greenplum或者postgresql要输入自己的模式。
    目标表就是自己的数据表。
    提交记录数量,一般1000或者2000。下面主选项使用批量插入进行勾选。
    数据库字段,自己获取字段和映射,
    更新,用来查询的关键字和更新字段。自行配置。
    Switch/Case,Switch字段和Case值数据类型和Case值。
        自己根据自己的字段和类型进行填写。

change步骤:
    第一步。在数据源的库表里面查询出这批数据的最大时间或者最大的批次号。
    第二步。然后在自己的数据表里面获取到开始时间或者最小的批次号
    (此数据表自己初始化好起始时间start_time或者最小批次号和查询条件,比如第几步和那一张表)。
    将第一步获取到的最大时间或者最大的批次号传递到第二步。
    第三步。更新自己的初始化好的数据表,将自己初始化好的数据表的最大时间或者最大批次号字段修改。
        同时进行表输入进行查询出数据。然后将这一步查询的数据传递到Switch/Case。
    第四步。Switch/Case。将上一步的数据根据Switch/Case。进行传递。
    第五步。进行各种数据表的输出。        
        
第一步:
1)、select '数据表名称' as table_name, 第几步 as part, COALESCE(max(update_time), now()) as next_time from 数据表名称
2)、postgresql,COALESCE()函数 
    主流数据库系统都支持COALESCE()函数,这个函数主要用来进行空值处理,其参数格
式如下: COALESCE ( expression,value1,value2……,valuen) 。
    COALESCE()函数的第一个参数expression为待检测的表达式,而其后的参数个数不定。
COALESCE()函数将会返回包括expression在内的所有参数中的第一个非空表达式。如果
expression不为空值则返回expression;否则判断value1是否是空值,如果value1不为空值则返
回value1;否则判断value2是否是空值,如果value2不为空值则返回value3;……以此类推,
如果所有的表达式都为空值,则返回NULL。
3)、MySQL,IFNULL函数是MySQL控制流函数之一,它接受两个参数,如果不是NULL,则返回第一个参数。 
    否则,IFNULL函数返回第二个参数。IFNULL(expression_1,expression_2);
        
第二步:    
1)、select ? as table_name, ? as part, start_time, ? as next_time from 数据表名称 where table_name='数据表名称' and part=第几步
2)、将第一步的三个参数,传递到第二步的三个问好的地方。
        
第三步:查询出每个case所需要的值的数据。同时修改next_time最大时间或者最大批次号。
select *, CASE  WHEN "字段"='标识位' THEN (select to_char(to_number(COALESCE(max("Cd_batch"), to_char(now(), 'yyyyMMdd')||'00000'), '9999999999999') + 1, '9999999999999') 
from core_data.dn_data_reconciliation where "TableName"='数据表名称' and "字段"='标识位') END AS "Cd_batch", 'I' as "Cd_operation"  
from 数据表名称 where '数据表名称'=? and 第几步=?  and update_time>? and update_time<=?

待续.....

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-11-13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档