1. 建立测试表,插入数据。
use test;
create table t_color (
id int unsigned not null auto_increment primary key,
color varchar(10),
create_date datetime,
last_update timestamp
) engine=myisam;
insert into t_color (color,create_date) values('black',now()),('green',now()),('red',now()),('blue',now());
select * from t_color;
2. 建立参数表存储最后一次的抽取时间。
use test;
-- 创建时间戳表
create table cdc_time (
last_load datetime,
current_load datetime) engine=myisam;
-- 初始化数据
insert into cdc_time values ('1971-01-01 00:00:01','1971-01-01 00:00:01');
select * from cdc_time;
3. 创建初始化时间戳转换
说明:
把current_load时间设置成作业的开始时间。通过“获取系统信息”完成这一功能,在这个步骤里创建一个“系统日期(变)”类型的字段,字段名是sysdate。然后创建一个“插入/更新”步骤,把“获取系统信息”步骤和“插入/更新”步骤连接起来。在“插入/更新”步骤的“更新字段”部分里,用流里的字段“sysdate”去更新表里的字段“current_load”。另外还要设置“用来查询的关键字”部分,把表的“current_load”的条件设置为“is not null”即可。
4. 创建查询变化数据的转换
说明: 从t_color表里抽取数据的查询语句使用开始日期和结束日期,左边闭区间,右边开区间。查询条件类似下面的语句: (create_date >= last_load and create_date < current_load) or (last_update >= last_load and last_update < current_load)
这里需要两个表输入步骤,一个用来从cdc_time表中抽取时间,另一个从t_color表中抽取需要的数据。另外再看查询条件,可以发现last_load和current_load分别出现两次。就是说在第一个表输入步骤中,这些时间值需要被抽取出来两次。
select
last_load last1,
current_load cur1,
last_load last2,
current_load cur2
from cdc_time;
在t_color表输入步骤里,选中“替换 sql 语句里的变量”,在“从步骤插入数据”下拉列表里选中上个表输入步骤。在select语句里写入下面的查询条件:
where (create_date >= ? and create_date < ?) or (last_update >= ? and last_update < ?) 前一个步骤传来的参数将替换上面语句里的问号,第一个问号的值是last1,第二个问号的值是cur1,等等。 通过比较create_date和last_update的值是否相等,可以判断出是新增的还是更改的数据。 case when create_date = last_update then 'new' else 'changed' end as flagfield 把变更数据输出到文本文件里。
5. 创建更新参数表的转换
说明:
如果转换中没有发生任何错误,要把current_load字段里的值复制到last_load字段里。如果转换中发生了错误,时间戳需要保持不变。把current_load字段里的值复制到last_load字段里需要“执行sql语句”步骤,脚本如下:
update cdc_time set last_load = current_load;
cdc_time表里之所以要有两个字段,是因为在加载过程中,会有新的数据被插入或更新,为避免脏读或死锁的情况,最好给create和update时间戳设定一个上限条件,也就是这里的current_load字段。 6. 创建作业
7. 测试 -- 运行作业 -- 查看diff文件
-- 查看cdc_time表
mysql> select * from cdc_time; +---------------------+---------------------+ | last_load | current_load | +---------------------+---------------------+ | 2014-12-16 11:10:05 | 2014-12-16 11:10:05 | +---------------------+---------------------+ 1 row in set (0.00 sec)
-- 修改数据
delete from t_color where id=3;
update t_color set color='Grey' where id=1;
insert into t_color (color,create_date) values('Yellow',now());
commit;
-- 运行作业 -- 查看diff文件
-- 查看cdc_time表
mysql> select * from cdc_time; +---------------------+---------------------+ | last_load | current_load | +---------------------+---------------------+ | 2014-12-16 11:16:02 | 2014-12-16 11:16:02 | +---------------------+---------------------+ 1 row in set (0.00 sec)
8. 总结
基于源数据的CDC要求源数据里有相关的属性列,ETL过程可以利用这些属性列,来判断出哪些数据是增量数据。最常见的属性列有以下两种:
这两种方法都需要一个额外的数据库表来存储上一次更新时间或上一次抽取的最后一个序列号。在实践中,一般是在一个独立的模式下或在数据缓冲区里创建这个参数表,不能在数据仓库里创建,更不能在数据集市里创建。基于时间戳和自增序列的方法是CDC最简单的实现方式,所以也是最常用的方法。但是它的缺点也是很明显的,主要如下: