介绍
版本说明
Flink 版本 | 说明 |
1.11 | 支持 |
1.13 | 支持 |
1.14 | 不支持 |
1.16 | 支持(默认打开两阶段提交,即 2PC 写) |
使用范围
Flink Connector Doris 目前仅支持 Doris sink。支持的 Doris 版本为0.14.0及以上版本,并且要求开启配置
enable_http_server_v2 = true
。DDL 定义
注意:
Flink 1.13、Flink 1.16 的 DDL 参数不同,请选择对应的版本使用。
作为数据目的地 Sink(Flink 1.13)
CREATE TABLE doris_sink_table (id INT,name VARCHAR) WITH ('connector' = 'doris', -- 固定值 'doris''fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl'username' = 'root', -- 访问Doris的用户名,拥有库的写权限'password' = 'password', -- 访问Doris的密码'sink.batch.size' = '500', -- 单次写BE的最大行数'sink.batch.interval' = '1s' -- flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。);
作为数据目的地 Sink (Flink 1.16)
CREATE TABLE doris_sink_table (id INT,name VARCHAR) WITH ('connector' = 'doris', -- 固定值 'doris''fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl'username' = 'root', -- 访问Doris的用户名,拥有库的写权限'password' = 'password' -- 访问Doris的密码);-- 注意: 默认打开 2PC 两阶段提交写入
作为 Catalog
CREATE CATALOG doris_catalog WITH ('type' = 'doris','fenodes' = 'FE_IP:FE_HTTP_PORT', -- Doris FE HTTP 地址'username' = 'root', -- 访问Doris的用户名,拥有库的写权限'password' = 'password', -- 访问Doris的密码'default-database' = 'default')
WITH 参数 (1.13 版本)
Sink
参数 | 说明 | 是否必填 | 备注 |
connector | 源表类型 | 是 | 固定值 doris |
fenodes | Doris FE HTTP 地址 | 是 | - |
table.identifier | Doris 表名,格式:db1.tbl1 | 是 | - |
username | 访问 Doris 的用户名 | 是 | - |
password | 访问 Doris 的密码 | 是 | - |
sink.batch.size | 单次写 BE 的最大行数 | 否 | 默认100 |
sink.max-retries | 写 BE 失败之后的重试次数 | 否 | 默认1 |
sink.batch.interval | flush 间隔时间,超过该时间后异步线程将缓存中数据写入 BE。默认值为1秒,支持时间单位 ms、s、min、h 和 d。设置为0,表示关闭定期写入 | 否 | 默认1s |
sink.properties.* | 否 | - | |
sink.enable-2pc | 是否采用事务写入 | 否 | false |
Catalog
参数 | 说明 | 是否必填 | 备注 |
type | - | 是 | 固定值 doris |
fenodes | Doris FE HTTP 地址 | 是 | - |
username | 访问 Doris 的用户名 | 是 | - |
password | 访问 Doris 的密码 | 是 | - |
default-database | 默认的database | 是 | - |
WITH 参数 (1.16 版本)
Sink
参数 | 说明 | 是否必填 | 备注 |
connector | 源表类型 | 是 | 固定值 doris |
fenodes | Doris FE HTTP 地址 | 是 | - |
table.identifier | Doris 表名,格式:db1.tbl1 | 是 | - |
username | 访问 Doris 的用户名 | 是 | - |
password | 访问 Doris 的密码 | 是 | - |
sink.enable-2pc | 是否采用事务写入 | 否 | true |
sink.properties.* | 否 | - |
类型映射
Doris 字段类型 | Flink 字段类型 |
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
| TIME |
DATE | DATE |
DATETIME | TIMESTAMP |
CHAR | STRING |
| LARGEINT |
| VARCHAR |
DECIMAL | DECIMAL |
| DECIMALV2 |
HLL | Unsupported datatype |
代码示例
CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- 每秒产生的数据条数);CREATE TABLE doris_sink_table (id INT,name STRING) WITH ('connector' = 'doris', -- 固定值 'doris''fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址'table.identifier' = 'test.sales_order', -- Doris 表名 格式:db.tbl'username' = 'root', -- 访问Doris的用户名,拥有库的写权限'password' = 'password', -- 访问Doris的密码'sink.batch.size' = '500', -- 单次写BE的最大行数'sink.batch.interval' = '1s' -- flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。);INSERT INTO doris_sink_table select * from datagen_source_table;
CREATE CATALOG doris_catalog WITH ('fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址'username' = 'root', -- 访问Doris的用户名,拥有库的写权限'password' = 'password', -- 访问Doris的密码'default-database' = 'default');CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- 每秒产生的数据条数);INSERT INTO `doris_catalog`.`my_database`.`my_table` SELECT * FROM.datagen_source_table;
MySQL-CDC 对接 Doris 代码示例
--mysql cdc 源表CREATE TABLE `mysql_cdc_source_table` (`id` INT NOT NULL,`name` VARCHAR,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = 'YourHostName', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'YourUserName', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'YourPassword', -- 数据库访问的密码'database-name' = 'YourDatabase', -- 需要同步的数据库'table-name' = 'YourTable' -- 需要同步的数据表名);--写入doris表CREATE TABLE `print_table` (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED) WITH ('connector' = 'doris', -- 固定值 'doris''fenodes' = 'FE_IP:FE_RESFUL_PORT', -- Doris FE HTTP 地址'table.identifier' = 'dbName.tableName', -- Doris 表名 格式:db.tbl'username' = 'YourUserName', -- 访问Doris的用户名,拥有库的写权限'password' = 'YourPassword', -- 访问Doris的密码'sink.batch.size' = '500', -- 单次写BE的最大行数'sink.batch.interval' = '1s' -- flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。);insert into print_tableselect id,name from mysql_cdc_source_table;
注意事项
Upsert
若需要 Upsert ,则要求 Doris 表必须是 Uniqe 模型或者 Aggregate 模型。建表示例如下:
-- Uniqe 模型建表语句CREATE TABLE `doris_sink_table` (`id` int(11),`name` varchar(32))UNIQUE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 10PROPERTIES("replication_num" = "3");-- Aggregate 模型建表语句CREATE TABLE `doris_sink_table` (`id` int(11),`name` varchar(32) REPLACE DEFAULT '0')AGGREGATE KEY('id')DISTRIBUTED BY HASH(`id`) BUCKETS 10PROPERTIES("replication_num" = "3"); -- 注意若 BE 节点不够,会报 `Failed to find enough host in all backends` 错误,可适当减少该值。
用户权限
用户必须拥有对应的库的写权限。
CREATE USER 'test' IDENTIFIED BY 'test_passwd';GRANT ALL ON test TO test;