摘要:本文介绍了如何使用 Dinky 实时计算平台构建 Flink CDC 整库入仓入湖。内容包括:
Tips:历史传送门~
《Dinky 0.6.1 已发布,优化 Flink 应用体验》
《Dlink 在 FinkCDC 流式入湖 Hudi 的实践分享》
GitHub 地址
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~
一、背景
伍翀 (云邪)、徐榜江 (雪尽) 老师们在 Flink Forward Asia 2021 上分享了精彩的《Flink CDC 如何简化实时数据入湖入仓》,带了新的数据入仓入湖架构。其中第四章节 Flink CDC 在阿里巴巴的实践和改进带来了前沿的思考与实践,其 CDAS、CTAS 数据同步语法的功能非常引人注目。近日,目标要成为 FlinkSQL 最佳搭档的 Dinky 也带来了 FlinkCDC 整库入仓入湖的实践,快一起来试用和改进下吧~
二、痛点
Flink CDC 的入湖入仓的痛点由《Flink CDC 如何简化实时数据入湖入仓》总结为以下四点:
该CDC入湖架构利用了 Hudi 自身的更新能力,可以通过人工介入指定一个准确的增量启动位点实现全增量的切换,但会有丢失数据的风险。
2.手工映射表结构易出错
通过 FlinkCDC 构建同步任务时,需要手工映射 Mysql 等表结构到 Flink DDL,当表和字段数目非常多时,开发和维护的成本将线性增加。而且人工映射字段类型容易出错。
3.Schema 变更导致入湖链路难以维护
表结构的变更是经常出现的事情,但它会使已存在的 FlinkCDC 任务丢失数据,甚至导致入湖链路挂掉。
4.整库入湖
整库入湖是一个炙手可热的话题了,目前通过 FlinkCDC 进行会存在诸多问题,如需要定义大量的 DDL 和编写大量的 INSERT INTO,更为严重的是会占用大量的数据库连接,对 Mysql 和网络造成压力。
三、解决方案
阿里基于 Flink 打造了 “全自动化数据集成” 的方案:
四、Dinky 的探索实践
Dinky 无缝支持最新的 Flink CDC。Flink CDC 目前已更新至 2.2.1,自 2.+ 版本以来,Flink CDC 的功能日趋稳定与完善,详情请见
https://github.com/ververica/flink-cdc-connectors
其中,最新的 Flink CDC 已具备全增量自动切换以及 schema 变更同步的功能。
Dinky 定义了 CDCSOURCE 整库同步的语法,该语法和 CDAS 作用相似,可以直接自动构建一个整库入仓入湖的实时任务,并且对 source 进行了合并,不会产生额外的 Mysql 及网络压力,支持对任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等。
五、CDCSOURCE 原理
面对建立的数据库连接过多,Binlog 重复读取会造成源库的巨大压力,上文分享采用了 source 合并的优化,尝试合并同一作业中的 source,如果都是读的同一数据源,则会被合并成一个 source 节点。
Dinky 采用的是只构建一个 source,然后根据 schema、database、table 进行分流处理,分别 sink 到对应的表。如下图为整库入湖 Hudi (两个表)。
2.元数据映射
上文分享采用了 MysqlCatalog 来获取源库的表和 schema,来映射 Flink DDL。
Dinky 是通过自身的数据源中心的元数据功能捕获源库的元数据信息,并同步构建 sink 阶段 datastream 或 tableAPI 所使用的 FlinkDDL。
3.多种 sink 方式
上文分享是 sink 到 hudi。
Dinky 提供了各式各样的 sink 方式,通过修改语句参数可以实现不同的 sink 方式。Dinky 支持通过 DataStream 来扩展新的 sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。
六、CDCSOURCE 用法
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
-- 'database-name'='test',
'table-name' = 'test\.student,test\.score',
-- 'sink.connector'='datastream-doris',
'sink.connector' = 'doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = 'dw123456',
'sink.sink.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'ODS_',
'sink.table.upper' = 'true',
'sink.table.identifier' = '${schemaName}.${tableName}',
'sink.sink.enable-delete' = 'true'
)
注意事项:
2.配置项
配置项 | 是否必须 | 默认值 | 说明 |
---|---|---|---|
connector | 是 | 无 | 同 Flink CDC |
hostname | 是 | 无 | 同 Flink CDC |
port | 是 | 无 | 同 Flink CDC |
username | 是 | 无 | 同 Flink CDC |
password | 是 | 无 | 同 Flink CDC |
scan.startup.mode | 否 | latest-offset | 同 Flink CDC |
database-name | 否 | 无 | 支持正则 |
schema-name | 否 | 无 | 支持正则 |
table-name | 否 | 无 | 支持正则,如果使用库名.表名的形式,需要使用 \. |
source.* | 否 | 无 | 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 |
checkpoint | 否 | 无 | 单位 ms |
parallelism | 否 | 无 | 任务并行度 |
sink.connector | 是 | 无 | 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 |
sink.sink.db | 否 | 无 | 目标数据源的库名,不指定时默认使用源数据源的库名 |
sink.table.prefix | 否 | 无 | 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ |
sink.table.suffix | 否 | 无 | 目标表的表名后缀 |
sink.table.upper | 否 | 无 | 目标表的表名全大写 |
sink.table.lower | 否 | 无 | 目标表的表名全小写 |
sink.* | 否 | 无 | 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 |
七、CDCSOURCE 实践
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector'='datastream-kafka',
'sink.topic'='dlinkcdc',
'sink.brokers'='127.0.0.1:9092'
)
2.实时数据同步至对应 kafka topic
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector'='datastream-kafka',
'sink.brokers'='127.0.0.1:9092'
)
3.实时数据 DataStream 入仓 Doris
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'datastream-doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = 'dw123456',
'sink.sink.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'ODS_',
'sink.table.upper' = 'true',
'sink.sink.enable-delete' = 'true'
)
4.实时数据 FlinkSQL 入仓 Doris
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'dlink',
'password' = 'dlink',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = 'dw123456',
'sink.sink.batch.size' = '1',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'ODS_',
'sink.table.upper' = 'true',
'sink.table.identifier' = '${schemaName}.${tableName}',
'sink.sink.enable-delete' = 'true'
)
5.实时数据入湖 Hudi
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'source.server-time-zone' = 'UTC',
'checkpoint'='1000',
'scan.startup.mode'='initial',
'parallelism'='1',
'database-name'='data_deal',
'table-name'='data_deal\.stu,data_deal\.stu_copy1',
'sink.connector'='hudi',
'sink.path'='hdfs://cluster1/tmp/flink/cdcdata/${tableName}',
'sink.hoodie.datasource.write.recordkey.field'='id',
'sink.hoodie.parquet.max.file.size'='268435456',
'sink.write.precombine.field'='update_time',
'sink.write.tasks'='1',
'sink.write.bucket_assign.tasks'='2',
'sink.write.precombine'='true',
'sink.compaction.async.enabled'='true',
'sink.write.task.max.size'='1024',
'sink.write.rate.limit'='3000',
'sink.write.operation'='upsert',
'sink.table.type'='COPY_ON_WRITE',
'sink.compaction.tasks'='1',
'sink.compaction.delta_seconds'='20',
'sink.compaction.async.enabled'='true',
'sink.read.streaming.skip_compaction'='true',
'sink.compaction.delta_commits'='20',
'sink.compaction.trigger.strategy'='num_or_time',
'sink.compaction.max_memory'='500',
'sink.changelog.enabled'='true',
'sink.read.streaming.enabled'='true',
'sink.read.streaming.check.interval'='3',
'sink.hive_sync.enable'='true',
'sink.hive_sync.mode'='hms',
'sink.hive_sync.db'='cdc_ods',
'sink.hive_sync.table'='${tableName}',
'sink.table.prefix.schema'='true',
'sink.hive_sync.metastore.uris'='thrift://cdh.com:9083',
'sink.hive_sync.username'='flinkcdc'
)
八、总结
Dinky 在 Flink CDC 整库入仓入湖的实践上解决了上文提到的三个痛点:全增量切换问题、手工映射表结构易出错、整库入湖,其中发现 Schema 变更导致入湖链路难以维护未进行解决,欢迎进一步讨论。
此外 Dinky 还支持了整库同步各种数据源的 sink,使用户可以完成入湖入仓的各种需求,欢迎验证。
本文没有对源码实现细节展开讨论,其实现原理理论上可以注入 FlinkSQL 的处理过程,使其可以在入仓入湖时进行数据加工处理,欢迎探索。
最后我们可以发现 Dinky 与其他开源项目相比,它更专注于 Flink 的应用体验提升,此外基于其设计原理,可以更方便地扩展各种企业级功能,如自定义语法、入湖入仓、Catalog 持久化、血缘应用等。所以,面对这么精彩的开源项目,你还在等什么,行动起来一起探索吧。
PS:全新功能新上线,大家一起来抓 bug ~