本文从头开始讲述使用Flink引擎实现hudi数据湖基于commit_time的查询语义。基本使用可参考前面文章hudi时间旅行查询
基本要求:
基本流程:
假定MySQL连接信息为:
MySQL中创建hudi数据库和student表。
create database hudi;
CREATE TABLE `hudi`.`student` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`name` varchar(20) NOT NULL COMMENT '学生名字',
`school` varchar(20) NOT NULL COMMENT '学校名字',
`nickname` varchar(20) NOT NULL COMMENT '学生小名',
`age` int(11) NOT NULL COMMENT '学生年龄',
`class_num` int(11) NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` bigint(20) NOT NULL COMMENT '电话号码',
`email` varchar(64) DEFAULT NULL COMMENT '家庭网络邮箱',
`ip` varchar(32) DEFAULT NULL COMMENT 'IP地址',
`address` text COMMENT '家庭地址',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8;COPY
新建测试数据生成工具元数据文件,保存如下数据为meta.txt
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(zhongda, beida, xidian, huagong)]
nickname||varchar(20)||学生小名[:enum(鬼泣, 高小王子, 歌神, 逗比)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
address||text||家庭地址[:address]COPY
生成10条测试数据并写入MySQL表
(在装有docker的服务器执行,由于自增ID从1开始,第二次执行需要修改自增ID起始值)
docker run -v /root/xzc/datafaker/meta.txt:/opt/meta.txt xiaozhch5/datafaker:v1.0 rdb mysql+mysqldb://root:Pass-123-root@192.168.1.2:3306/hudi?charset=utf8 student 10 --meta /opt/meta.txtCOPY
查询student表中的数据
select * from hudi.student limit 10;
COPY
删除id为1的数据以及更新id为2的数据
delete from hudi.student where id = 1;
update hudi.student set name = "BQmBbNb" where id = 2;
COPY
yarn上启动flink session
export HADOOP_CLASSPATH
bin/yarn-session.sh -s 8 -jm 2048 -tm 2048 -nm flink-hudi-test -dCOPY
启动flink sql
bin/sql-client.shCOPY
以hdfs上的/user/hudi/warehouse为目录创建catalog
create catalog hudi with('type'='hudi','catalog.path'='hdfs://bigdata:8020/user/hudi/warehouse');COPY
创建test数据库和student表:
create database hudi.test;
drop table if exists hudi.test.student;
create table hudi.test.student (
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ'
);COPY
Flink SQL中创建stu表读取数据
create table student_mysql (
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
'connector'='mysql-cdc',
'hostname'='192.168.1.2',
'port'='3306',
'username'='root',
'password'='Pass-123-root',
'database-name'='hudi',
'table-name'='student'
);COPY
将mysql数据同步到hudi表
insert into hudi.test.student select * from student_mysql;COPY
hudi每次数据写入时都会生成一个时间戳,用于表示数据写入的时间,基于该特性,在进行数据查询时可使用该时间对hudi中数据进行查询。
查看所有时间戳数据:
create table student_commit_time_view(
`_hoodie_commit_time` string,
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student'
)
COPY
select distinct _hoodie_commit_time from student_commit_time_view
COPY
查到有两个时间戳,分别为:
现在查询时间戳小于20220622152536101的数据,再次创建hudi表
create table student_view_1(
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
'read.start-commit' = 'earliest',
'read.end-commit' = '20220622152536101'
);COPY
SET 'sql-client.execution.result-mode' = 'tableau';
select * from student_view_1;
COPY
得到如下10条数据
现在查询时间戳小于20220622152707516的数据,再次创建hudi表
create table student_view_11(
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
'read.start-commit' = 'earliest',
'read.end-commit' = '20220622152707516'
);COPY
查询数据,得到:
select * from student_view_11;COPY
但是,这个_hoodie_commit_time是有保存时间限制的,保存时间由clean.retain_commits配置项和checkpoint interval确定。
clean.retain_commits表示数据写入到hudi的次数
checkpoint interval表示每次checkpoint的时间间隔
当_hoodie_commit_time距离现在的时间超过clean.retain_commits * checkpoint interval,那么数据会被清除。
比如说,现在设置checkpoint的时间间隔为3分钟,设置clean.retain_commits的次数为10次,那么通过_hoodie_commit_time最多只能查询到30分钟以前的数据,再之前的数据查不到。
在本次实验中,设置的checkpoint时间间隔为20秒,clean.retain_commits的次数为30次,也就是说,如果有数据持续写入的话,通过时间戳的方式,最多查询到600秒以内的数据。
changelog 模式下,这个参数可以控制 changelog 的保留时间.
再次往hudi表中持续插入100万条数据。我们发现在数据持续插入600秒之后,之前数据查不到了:
select * from student_view_11;COPY
select * from student_view_1;COPY
现在的时间戳是2022062216460000,我们查询这个时间之前600秒的数据
create table student_view_33(
`id` INT COMMENT '自增id',
`name` STRING NOT NULL COMMENT '学生名字',
`school` STRING NOT NULL COMMENT '学校名字',
`nickname` STRING NOT NULL COMMENT '学生小名',
`age` INT NOT NULL COMMENT '学生年龄',
`class_num` INT NOT NULL COMMENT '班级人数',
`score` decimal(4,2) NOT NULL COMMENT '成绩',
`phone` INT NOT NULL COMMENT '电话号码',
`email` STRING NULL COMMENT '家庭网络邮箱',
`ip` STRING NULL COMMENT 'IP地址',
`address` STRING COMMENT '家庭地址',
`update_time` timestamp NOT NULL COMMENT '更新时间',
`create_time` timestamp NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`) NOT ENFORCED
)
PARTITIONED BY (`school`)
with (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'path' = 'hdfs://bigdata:8020/user/hudi/warehouse/test/student',
'read.start-commit' = '20220622163600000',
'read.end-commit' = '20220622164600000'
);COPY
查询数据总量
select count(*) from student_view_33;COPY
得到:
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/bigdata/hudi/hudi-basic/6828/