适用场景
在实时湖仓的数据链路建设过程中,将传统关系型数据库(如 MySQL 等)中的整库数据低延迟、高吞吐的同步到下游的 OLAP 数据库(如 Doris、ClickHouse)或者归档到对应的文件系统中(如 HDFS、Hive 表),是一个非常普遍和强烈的需求。
Oceanus 提供了非常方便的 CDAS(CREATE DATABASE AS)SQL 语法,来支持将数据库中的整库数据全量导入、增量实时同步写入到用户指定的下游系统中。
语法说明
CREATE DATABASE IF NOT EXISTS <target_database>[COMMENT database_comment][WITH (key1=val1, key2=val2, ...)] -- 指明写入目标库的参数AS DATABASE <source_catalog>.<source_database> -- source_database 是被同步的源数据库INCLUDING { ALL TABLES | TABLE 'table_name' }-- INCLUDING ALL TABLES 表示同步数据库中的所有表-- INCLUDING TABLE 'table' 表示同步数据库中特定的表,支持正则表达式,如 'order_.*';-- 同步多张表时,可以写成 INCLUDING TABLE 'tableA|tableB|tableC'的格式[EXCLUDING TABLE 'table_name']-- EXCLUDING TABLE 'table' 表示不同步数据库中特定的表,支持正则表达式,如 'order_.*';-- 排除多张表时,可以写成 EXCLUDING TABLE 'tableA|tableB|tableC'的格式[/*+ `OPTIONS`('key1'='val1', 'key2'='val2', ... ) */]-- (可选,指明读取source的参数,如指定source serverId的范围,解析debezium时间戳字段类型等)
参数说明:
参数 | 解释 |
target_database | 待写入的目标数据库名 |
database_comment | 待写入的数据库注释 |
WITH参数 | 指明写入目标库的参数,目前会被翻译成下游 sink 表的描述参数 |
<source_catalog>.<source_database> | 声明源 catalog 中需要同步的数据库 |
INCLUDING ALL TABLES | 同步源库中的所有表 |
INCLUDING TABLE | 同步数据库中特定的表,支持正则表达式,如 'order_.*' ; 同步多张表时,可以写成 INCLUDING TABLE 'tableA|tableB'格式 |
EXCLUDING TABLE | 表示不同步数据库中特定的表,支持正则表达式,如 'order_.*'; 排除多张表时,可以写成 EXCLUDING TABLE 'tableA|tableB'格式 |
OPTIONS | 可选,指明读取 Source 时覆盖的参数,如指定 source serverId 的范围等 |
注意
第三行 [WITH (key1=val1, key2=val2, ..)] 指明写入目标库的参数中, value 值支持将 Source 表的表名进行变量替换,使用方法是使用占位符 $tableName。
如下整库同步到 Doris 的示例中,写入每一个 sink 的表中 $tableName 会替换为相对应 MySQL 库中源表表名。
create catalog my_mysql with(...);create database if not exists sink_dbwith ('connector' = 'doris','table.identifier' = 'db1.$tableName_doris'...)including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型
状态无损的加表能力(仅限 MySQL 源表)
如果您的 MySQL 数据库有较多的新增表需求,希望 Flink 可以支持新增表的数据同步,且不影响现有各表的同步状态,那么您可以使用 SET 和 OPTIONS 选项来开启该功能(本示例还开启了MySQL CDC 的多 Source 复用、自动拆分最后一个大分片等高级能力,请按需使用):
-- 必须设置 table.optimizer.deterministic-operator-uid-for-cdas=true 才可以启用无损加表能力SET table.optimizer.deterministic-operator-uid-for-cdas=true;-- 可选:开启多 Source 复用,降低资源占用,提升稳定性SET table.optimizer.mysql-cdc-source.merge.enabled=true;-- 请替换成实际的 CDAS 语句,但至少要保留 'scan.newly-added-table.enabled' = 'true' 的 OPTIONS 选项create catalog my_mysql with(...);create database if not exists sink_dbwith (...)including all tables/*+ `OPTIONS`('scan.newly-added-table.enabled' = 'true','scan.lastchunk.optimize.enable' = 'true') */;
注意:
1. SET 参数、scan.newly-added-table.enabled 的 OPTIONS 需要在作业首次启动时就加上;否则需要丢弃现有状态,重新全量同步一次数据,后续才可无损加表。
2. 只有当作业启动时,才可以感知新增表的存在。因此每当有加表操作,请对作业做一个快照,然后从该快照恢复运行,此时新表才会开始同步。
3. 当表的数量多时,我们建议使用 SET table.optimizer.mysql-cdc-source.merge.enabled=true; 语句开启多 Source 复用能力,提升整体稳定性。
4. 当表有持续的大流量写入时,我们建议开启 'scan.lastchunk.optimize.enable' = 'true' 参数,以避免最后一个分片过大导致 TaskManager OOM。
MySQL-CDC 元数据字段读取
MySQL CDC connector 除支持提取物理表的字段外,还支持了元数据字段列表可以提取。整库同步功能中也提供了一个 options 配置参数,可以用来控制同步所需的元数据字段。
参数 | 解释 |
oceanus.source.include-metadata.fields | 需要同步的 source 表的元字段,格式为 'table_name:table_name;meta.batch_id:batch_id', 元数据字段定义通过分号;分隔,每个元数据字段格式为 metadataColumn:alias, 第一部分为实际对应的元数据 column,第二部分为重命名后的值。 |
注意:元数据字段会按照声明的顺序,追加到源表之后。
使用实例:
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'xx','password' = 'xxx','base-url' = 'xxx');create database if not exists print_sink_dbcomment 'test_sink'with ('connector' = 'print','print-identifier' = '$tableName')as database `my_mysql`.`test`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai','oceanus.source.include-metadata.fields'='table_name:table_name;database_name:database_name;op_ts:op_ts;meta.table_name:meta_table_name;meta.database_name:meta_database_name;meta.op_ts:meta_op_ts;meta.op_type:meta_op_type;meta.batch_id:meta_batch_id;meta.is_ddl:meta_id_ddl;meta.mysql_type:meta_mysql_type;meta.update_before:meta_update_before;meta.pk_names:meta_pk_names;meta.sql:meta_sql;meta.sql_type:meta_sql_type;meta.ts:meta_ts')*/;
使用方法
1. 首先注册 MySQL 的 Catalog,作为待同步的数据源表,示例如下:
create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'XXX','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port'-- 'jdbc.properties.tinyInt1isBit' = 'false' -- jdbc参数,是否把tinyInt识别为bool. 默认为true.-- 如果表字段包含tinyint(1), 建议把 jdbc.properties.tinyInt1isBit 设置为false.);
2. 对于不支持自动建表的下游系统,需要事先保证在下游系统中建立和上游 Mysql 表中一一对应的源表。
3. 使用整库同步语法指定需要同步的同步表,其中写入目标库的参数现在只支持填入下游 connector 的必要参数。
4. 可以通过
jdbc.properties.*
传入对应的 jdbc 参数。同步到 Hudi 示例
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink'with ('connector' = 'hudi','path' = 'hdfs://namenode:8020/user/hive/warehouse/$tableName_mor') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型
同步到 ClickHouse 示例
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ( 'type' = 'jdbc', 'default-database' = 'test', 'username' = 'root', 'password' = 'XXX', 'base-url' = 'jdbc:mysql://ip:port' );create database if not exists sink_db comment 'test_sink' with ( 'connector' = 'clickhouse://172.11.11.11:8123', -- 如果ClickHouse集群未配置账号密码可以不指定 --'username' = 'root', -- ClickHouse集群用户名 --'password' = 'root', -- ClickHouse集群的密码 'database-name' = 'testdb', -- 数据写入目的数据库 'table-name' = 'test_table1', -- 数据写入目的数据表 'sink.batch-size' = '1000', 'table.collapsing.field' = 'Sign' ) as databasemy_mysql
.test_db
including all tables /*+OPTIONS
('server-time-zone' = 'Asia/Shanghai') */;
同步到 Doris 示例
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink' with ('connector' = 'doris','table.identifier' = 'trade_log.$tableName','username' = 'admin','password' = 'xxx','sink.batch.size' = '500','sink.batch.interval' = '1s','fenodes' = 'ip:port') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('server-time-zone' = 'Asia/Shanghai') */; -- 声明解析timestamp字段的时区类型
同步到 Hive 示例
SET table.optimizer.mysql-cdc-source.merge.enabled=true;create catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');create database if not exists sink_dbcomment 'test_sink' with ('connector' = 'hive','hive-version' = '2.3.6','hive-database' = 'test_100','hive-table' = '$tableName','sink.partition-commit.policy.kind' = 'metastore') as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('append-mode' = 'true', 'server-time-zone' = 'Asia/Shanghai') */;-- 因为hive sink不支持变更数据,此处的hint会把原始cdc的变更数据转成成append流下发
作业运行拓扑图展开后如下:
同步到 Hive 自动建表示例:
作业需要提前配置连接 Hive 服务的 jar 包,详细配置可以参考 获取 Hive 连接配置 jar 包。
SET table.optimizer.mysql-cdc-source.merge.enabled=true;-- 注册mysql的catalogcreate catalog my_mysql with ('type' = 'jdbc','default-database' = 'test','username' = 'root','password' = 'XXX','base-url' = 'jdbc:mysql://ip:port');-- 注册hive端catalogcreate catalog my_hive with ('type' = 'hive','default-database' = 'default','hive-version' = '2.3.5');create database if not exists `my_hive`.`trade_log`as database `my_mysql`.`trade_log`including all tables/*+ `OPTIONS`('append-mode' = 'true', 'server-time-zone' = 'Asia/Shanghai') */;-- 因为hive sink不支持变更数据,此处的hint会把原始cdc的变更数据转成成append流下发
同步到 DLC 示例
SET table.optimizer.mysql-cdc-source.merge.enabled = true;SET table.optimizer.deterministic-operator-uid-for-cdas=true;-- 注册mysql的catalogcreate catalog `my_mysql` with ('type' = 'jdbc','default-database' = 'testdb', -- 数据库名'username' = '${username}', -- mysql用户名'password' = '${password}', -- mysql密码'base-url' = 'jdbc:mysql://ip:3306');-- 整库同步create database if not exists `my_dlc_database`comment 'test db sync'with ('connector' = 'iceberg-inlong', -- 固定值'catalog-database' = 'test', -- DLC内表所在的数据库名称'catalog-table' = 'my_$tableName', -- DLC内表名称,$tableName会自动替换为要同步的表名,DLC表需要提前创建'default-database' = 'test', -- DLC内表所在的数据库名称'catalog-name' = 'HYBRIS', -- 固定值'catalog-impl' = 'org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog', -- 固定值'qcloud.dlc.managed.account.uid' = '100026378089', -- 固定值,DLC管理账号的uid'qcloud.dlc.secret-id' = '${secret_id}', -- DLC 用户的secretId从https://console.cloud.tencent.com/cam/capi中获取'qcloud.dlc.secret-key' = '${secret_key}', -- DLC 用户的secretKey,从https://console.cloud.tencent.com/cam/capi中获取'qcloud.dlc.region' = 'ap-guangzhou', -- DLC 所在地域,必须填ap-地域格式'qcloud.dlc.jdbc.url' = 'jdbc:dlc:dlc.internal.tencentcloudapi.com?task_type=SparkSQLTask&database_name=test&datasource_connection_name=DataLakeCatalog®ion=ap-guangzhou&data_engine_name=${engine_name}', -- DLC jdbc接入url,格式见https://cloud.tencent.com/document/product/1342/61547'uri' = 'dlc.internal.tencentcloudapi.com', -- 固定值'user.appid' = '${appid}', -- DLC 用户的 appid'request.identity.token' = '100026378089' -- 固定值,DLC内表接入的token)as database `my_mysql`.`test` including table 'user.*'excluding table 'user_info_20230530|user_behavior_tmp'/* +`OPTIONS`('server-time-zone' = 'Asia/Shanghai','scan.newly-added-table.enabled' = 'true','scan.lastchunk.optimize.enable' = 'true'*/;
使用提醒
1. Flink 1.11 版本不支持整库同步(SQL)能力。
2. 目前只支持同步 MySQL 类型数据库作为整库同步的源表。
3. 目前同步到目标端时,除 Iceberg、Elasticsearch、Hudi 和 Hive(需要提前注册 Hive Catalog)作为目标表外,其它的目标端还不支持自动建表,需要事先在目标端中建立和 Mysql 库中数据表对应的表结构。
4. 推荐搭配 MySQL CDC Source 复用功能开启,一起使用,可以降低对数据库的压力。
5. CDAS 语法没有限制下游输出的类型,理论上可以同步到任意的下游类型。
6. 当同步的表的数量非常多的时候,flink 生成的单个 task 的 name 会非常长,导致 metric 系统占用大量的内存,影响作业稳定性,Oceanus 针对这种情况引入了
pipeline.task-name-length
参数来限制 taskName 的长度,能极大的提高作业稳定性和日志可读性。(适用 Flink-1.13 和 Flink-1.14 版本)。
可以在作业的配置中生效:set pipeline.task-name-length=80;