介绍
flink-connector-starrocks 基于开源的 starrocks-connector-for-apache-flink v1.2.4 实现,支持通过 Flink 读写 StarRocks。
版本说明
Flink 版本 | 说明 |
1.11 | 不支持 |
1.13 | 支持(批数据源、维表、目的表) |
1.14 | 支持(批数据源、维表、目的表) |
1.16 | 支持(目的表) |
作为数据目的(Sink)
flink-connector-starrocks 作为数据目的,用于导入数据至 StarRocks,相比于 Flink 官方提供的 flink-connector-jdbc,导入性能更佳,flink-connector-starrocks 的内部实现是通过缓存并批量由 Stream Load 导入。支持 cvs、json 两种数据格式。
以下为 MySQL-CDC 数据实时导入 StarRocks 的示例。
CREATE TABLE `mysql_cdc` (`user_id` bigint,`item_id` bigint,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '9.134.34.15', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'xxx', -- 数据库访问的密码'database-name' = 'test', -- 需要同步的数据库'table-name' = 'user_behavior' -- 需要同步的数据表名);CREATE TABLE `pk_starrocks`(`user_id` bigint,`item_id` bigint,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'starrocks','jdbc-url' = 'jdbc:mysql://172.28.28.98:9030','load-url' = '172.28.28.98:8030','database-name' = 'oceanus','table-name' = 'pk_user_behavior','username' = 'root','password' = 'xxx','sink.buffer-flush.interval-ms' = '15000','sink.properties.format' = 'json','sink.properties.strip_outer_array' = 'true', -- 需要设置为 true-- 'sink.parallelism' = '1','sink.max-retries' = '3','sink.semantic' = 'exactly-once');INSERT INTO `pk_starrocks` SELECT * FROM `mysql_cdc`;
WITH 参数
参数 | 必填 | 默认值 | 数据类型 | 描述 |
connector | YES | NONE | String | 固定值 starrocks |
jdbc-url | YES | NONE | String | fe 节点 query_port:
jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port...
,例如
jdbc:mysql://172.28.28.98:9030 |
load-url | YES | NONE | String | fe 节点 http_port:
fe_ip1:http_port;fe_ip2:http_port....
,例如
172.28.28.98:8030 |
database-name | YES | NONE | String | starrocks 数据库名 |
table-name | YES | NONE | String | starrocks 表名 |
username | YES | NONE | String | starrocks 用户名 |
password | YES | NONE | String | starrocks 用户密码 |
sink.semantic | NO | at-least-once | String | 可选值: at-least-once exactly-once ,只在 checkpoint 时写数据。注意此时 sink.buffer-flush.* 相关参数无效 |
sink.version | NO | AUTO | String | 可选值: V1 :使用 stream-load 实现 exactly-once 语义,性能比较低,适用所有的 starrocks 版本V2 :使用 transaction-load 实现 exactly-once 语义,适用 starrocks 2.4 及之后的版本AUTO :根据 starrocks 是否支持 transaction-load 自动选择 |
sink.buffer-flush.max-bytes | NO | 94371840(90M) | String | 批量写入参数, sink.semantic 为 at-least-once 时有效。当 buffer 中数据大小超过设置值后,触发写入 StarRocks。取值范围[64MB, 10GB] |
sink.buffer-flush.max-rows | NO | 500000 | String | 批量写入参数, sink.semantic 为 at-least-once 时有效。当 buffer 中数据条数超过设置值后,触发写入 StarRocks。取值范围[64,000, 5000,000] |
sink.buffer-flush.interval-ms | NO | 300000 | String | 批量写入参数, sink.semantic 为 at-least-once 时有效。间隔固定的周期(毫秒)触发写入 StarRocks。取值范围 [1000, 3600000] |
sink.max-retries | NO | 3 | String | 写 StarRocks 的 stream load 请求的重试次数,取值范围 [0, 1000] |
sink.parallelism | NO | NULL | String | 单独指定 sink 的并行度,不设置则使用全局并行度 |
sink.connect.timeout-ms | NO | 1000 | String | 连接 load-url 的超时时间(毫秒),取值范围 [100, 60000] |
sink.label-prefix | NO | NO | String | |
sink.properties.format | NO | CSV | String | 导入 StarRocks 的数据格式,可选值 CSV 和 JSON ,默认为 CSV |
sink.properties.column_separator | NO | \\t | String | |
sink.properties.row_delimiter | NO | \\n | String | |
sink.properties.strip_outer_array | NO | false | String | 用于 JSON 格式,指定是否裁剪最外层的数组结构。取值范围: true 和 false 。默认值:false 。Flink 批量导入 JSON 数据在最外层有一对表示数组结构的中括号 []。因此,需要您指定该参数取值为 true ,这样 StarRocks 会剪裁掉外层的中括号 [],并把中括号 [] 里的每个内层数组都作为一行单独的数据导入。如果您指定该参数取值为 false ,则 StarRocks 会把整个 JSON 数据文件解析成一个数组,并作为一行数据导入。例如,待导入的 JSON 数据为 [ {"category" : 1, "author" : 2}, {"category" : 3, "author" : 4} ] ,如果指定该参数取值为 true ,则 StarRocks 会把 {"category" : 1, "author" : 2} 和 {"category" : 3, "author" : 4} 解析成两行数据,并导入到目标 StarRocks 表中对应的数据行。参考 JSON 适用参数 |
sink.properties.* | NO | NONE | String | stream load 属性,例如 'sink.properties.columns' = 'k1, v1'。自 StarRocks 2.4 开始,主键模型 支持部分列更新。更多参数请参考 stream load 文档 |
说明:
sink.semantic
为 at-least-once
时,sink.buffer-flush.max-bytes、sink.buffer-flush.max-rows、sink.buffer-flush.interval-ms 任意条件满足时触发 StarRocks 写操作。sink.semantic
为 exactly-once
时,依赖 flink 的 checkpoint,在每次 checkpoint 时保存批数据以及其 label,在checkpoint 完成后的第一次 invoke 中阻塞 flush 所有缓存在 state 当中的数据,以此达到精准一次。此时 sink.buffer-flush.* 相关参数无效。类型映射(Sink)
Flink type | StarRocks type |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY<T> | ARRAY<T> |
MAP<KT,VT> | JSON STRING |
ROW<arg T...> | JSON STRING |
注意事项
StarRocks 数据模型
对于 Upsert 流,需要使用主键模型,否则 DELETE 消息无法写入 StarRocks。
相对于 Merge-On-Read 策略的更新模型,主键模型的查询性能能够提升 3~10 倍。
主键模型可利用部分列更新实现多流 JOIN。
作为数据源(Source)
WITH 参数
参数 | 必填 | 默认值 | 数据类型 | 描述 |
connector | YES | NONE | String | 固定值 starrocks |
scan-url | YES | NONE | String | fe 节点 http_port: fe_ip1:http_port,fe_ip2:http_port.... ,例如 172.28.28.98:8030 |
jdbc-url | YES | NONE | String | fe 节点 query_port: jdbc:mysql://fe_ip1:query_port,fe_ip2:query_port... ,例如 jdbc:mysql://172.28.28.98:9030 |
username | YES | NONE | String | StarRocks 用户名 |
password | YES | NONE | String | StarRocks 用户密码 |
database-name | YES | NONE | String | StarRocks 数据库名 |
table-name | YES | NONE | String | StarRocks 表名 |
scan.connect.timeout-ms | NO | 1000 | String | 网络连接超时时间(毫秒) |
scan.params.keep-alive-min | NO | 10 | String | 最大 keep alive 时间(分钟) |
scan.params.query-timeout-s | NO | 600(5min) | String | 单次查询超时时间(秒) |
scan.params.mem-limit-byte | NO | 102410241024(1G) | String | 单次查询内存限制 |
scan.max-retries | NO | 1 | String | 重试次数 |
lookup.cache.ttl-ms | NO | 5000 | Long | 维表查询的 cache 超时时间 |
批数据源
CREATE TABLE `starrocks` (`user_id` bigint,`item_id` bigint,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'starrocks' ,'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port'scan-url' = '172.28.28.98:8030', -- http_port'database-name' = 'oceanus','table-name' = 'pk_user_behavior','username' = 'root','password' = 'xxx');CREATE TABLE `print_sink` (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'logger');INSERT INTO `print_sink`SELECT * FROM starrocks;
维表
CREATE TABLE `starrocks` (`user_id` bigint,`item_id` bigint,`behavior` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'starrocks' ,'jdbc-url' = 'jdbc:mysql://172.28.28.98:9030', -- query_port, FE mysql server port'scan-url' = '172.28.28.98:8030', -- http_port'database-name' = 'oceanus','table-name' = 'pk_user_behavior','username' = 'root','password' = 'xxx');CREATE TABLE `datagen` (`user_id` BIGINT,`ts` as PROCTIME(),PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.user_id.min' = '1','fields.user_id.max' = '20');CREATE TABLE `print_sink` (`user_id` BIGINT,`item_id` BIGINT,`behavior` STRING,`ts` TIMESTAMP,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'logger');INSERT INTO `print_sink`SELECT a.user_id,b.item_id,b.behavior,a.tsFROM `datagen` a LEFT JOIN `starrocks` FOR SYSTEM_TIME AS OF a.ts as bON a.user_id = b.user_id;
类型映射(source)
StarRocks | Flink |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |