版本说明
Flink 版本 | 说明 |
1.11 | 不支持 |
1.13 | 支持 Sink |
1.14 | 不支持 |
1.16 | 不支持 |
使用范围
可以作为 Sink 使用。目前支持写入 DLC 托管的原生表。
DDL 定义
CREATE TABLE `eason_internal_test`(`name` STRING,`age` INT) WITH ('connector' = 'iceberg-inlong','catalog-database' = 'test','catalog-table' = 'eason_internal_test','default-database' = 'test','catalog-name' = 'HYBRIS','catalog-impl' = 'org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog','qcloud.dlc.secret-id' = '12345asdfghASDFGH','qcloud.dlc.secret-key' = '678910asdfghASDFGH','qcloud.dlc.region' = 'ap-guangzhou','qcloud.dlc.jdbc.url' = 'jdbc:dlc:dlc.internal.tencentcloudapi.com?task_type=SparkSQLTask&database_name=test&datasource_connection_name=DataLakeCatalog®ion=ap-guangzhou&data_engine_name=dailai_test','qcloud.dlc.managed.account.uid' = '100026378089','request.identity.token' = '100026378089','user.appid' = '1257058945','uri' = 'dlc.internal.tencentcloudapi.com');
WITH 参数
通用参数
参数值 | 必填 | 默认值 | 描述 |
connector | 是 | 无 | connector 类型,必须填 iceberg-inlong |
catalog-database | 是 | 无 | DLC 内表所在的数据库名称 |
catalog-table | 是 | 无 | DLC 内表名称 |
default-database | 是 | 无 | DLC 内表所在的数据库名称 |
catalog-name | 是 | 无 | catalog 名称,必须填 HYBRIS |
catalog-impl | 是 | 无 | catalog 的实现类,必须填 org.apache.inlong.sort.iceberg.catalog.hybris.DlcWrappedHybrisCatalog |
qcloud.dlc.managed.account.uid | 是 | 无 | DLC 管理账号的 uid,此处固定填写 100026378089 |
qcloud.dlc.secret-id | 是 | 无 | |
qcloud.dlc.secret-key | 是 | 无 | |
qcloud.dlc.region | 是 | 无 | DLC 所在地域,必须填 ap-地域 格式 |
qcloud.dlc.jdbc.url | 是 | 无 | |
uri | 是 | 无 | DLC 接入 uri,必须填 dlc.internal.tencentcloudapi.com |
user.appid | 是 | 无 | DLC 用户的 appid |
request.identity.token | 是 | 无 | DLC 内表接入的 token,此处固定填写 100026378089 |
sink.ignore.changelog | 否 | 是 | 是否忽略 delete 数据,默认为 false,设为 true 则进入 append mode |
DLC 表配置
Upsert 模式-- DLC 建表语句CREATE TABLE `bi_sensor`(`uuid` string,`id` string,`type` string,`project` string,`properties` string,`sensors_id` string,`time` int,`hour` int) PARTITIONED BY (`time`);-- 将目标表设为 v2 表,允许 upsertALTER TABLE `bi_sensor` SET TBLPROPERTIES ('format-version'='2','write.metadata.delete-after-commit.enabled' = 'true', 'write.metadata.previous-versions-max' = '100', 'write.metadata.metrics.default' = 'full', 'write.upsert.enabled'='true', 'write.distribution-mode'='hash');-- oceanus sink DDL,dlc 的主键和分区字段必须在 flink 定义的主键字段中create table bi_sensors (`uuid` STRING,`id` STRING,`type` STRING,`project` STRING,`properties` STRING,`sensors_id` STRING,`time` int,`hour` int,PRIMARY KEY (`uuid`, `time`) NOT ENFORCED) with (...)
注意:
若您写入的 DLC 是基于元数据加速桶的,需要参考如下操作流程。
基于元数据加速桶的 DLC 操作流程
客户为 Oceanus 集群设置元数据加速桶的 DLC 权限
1. 进入 数据湖计算 DLC 控制台,单击全局配置 > 存储配置 > 元数据加速桶配置。
2. 选择需要写入的托管桶,单击配置。
3. 单击新增权限配置,网络选择创建 Oceanus 集群时绑定的子网。Oceanus 集群子网可以在集群信息中查看。
客户为作业添加配置后启动作业
首先将下列 jar 包添加到依赖文件,并在作业参数中引用:
随后在作业高级参数中添加如下参数:
flink.hadoop.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapterflink.hadoop.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapterflink.hadoop.fs.ofs.tmp.cache.dir: /tmp/chdfs/flink.hadoop.fs.ofs.upload.flush.flag: trueflink.hadoop.fs.ofs.user.appid: 客户的 appidflink.hadoop.fs.cosn.trsf.fs.ofs.user.appid: 客户的 appidflink.hadoop.fs.cosn.trsf.fs.ofs.bucket.region: cos 所在地域(例如:弗吉尼亚对应 na-ashburn,广州对应 ap-guangzhou)flink.hadoop.fs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs# 注意:使用 hadoop 用户权限则添加下面参数,使用 flink 用户无需添加containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadoop