摘要:本文由韩非老师介绍了 Dinky 实时计算平台基于 Flink SQL Connector 并结合 Catalog 来扩展 ClickHouse 的实践分享。内容包括:
Tips:历史传送门~
《Dinky 实践系列之 Flink Catalog 元数据管理》
《Dinky FlinkCDC 整库入仓 StarRocks》
《打造 Flink + StarRocks+ Dinky 的极速统一分析平台》
GitHub 地址
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~
一、前言
当前在大数据领域比较火热的 OLAP 引擎有 Doris 和 ClickHouse。基于 Apache Flink 的实时计算平台 Dinky 目前已经支持 Doris 和 ClickHouse 。在本次集成实践中,将以 Hive Catalog 和 Dinky 中的 MySQL Catalog 为元数据管理,将MySQL 数据写入 ClickHouse。
二、环境要求
软件 | 版本 |
---|---|
CDH | 6.2.0 |
Hadoop | 3.0.0-cdh6.2.0 |
Hive | 2.1.1-cdh6.2.0 |
Flink | 1.13.6 |
Flink CDC | 2.2.1 |
Dinky | 0.6.6 |
MySQL | 5.7 |
ClickHouse | 22.2.2.1(单机版) |
三、Flink ClickHouse 连接器编译
此连接器包含 Flink1.12、Flink1.13和Flink1.14。下面以 Flink1.13 为例编译connector。
git clone https://github.com/itinycheng/flink-connector-clickhouse.git
cd flink-connector-clickhouse/
git checkout -b release-1.13 origin/release-1.13
修改pom。
<version>1.13.2-SNAPSHOT</version>
修改为
<version>1.13.6</version>
<flink.version>1.13.2</flink.version>
修改为
<flink.version>1.13.6</flink.version>
#scala版本根据自身情况修改
<scala.binary.version>2.11</scala.binary.version>
修改为
<scala.binary.version>2.12</scala.binary.version>
修改完成后,即可进行编译。
编译完成后,jar包如下:
四、所需依赖
#hive依赖包
antlr-runtime-3.5.2.jar
hive-exec-2.1.1-cdh6.2.0.jar
libfb303-0.9.3.jar
flink-sql-connector-hive-2.2.0_2.12-1.13.6.jar
hive-site.xml
# hadoop依赖
flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
# Dinky mysql catalog依赖
dlink-catalog-mysql-1.13-0.6.6-SNAPSHOT.jar
# Dinky hadoop依赖
flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar
# mysql 驱动依赖
mysql-connector-java-8.0.21.jar
# clickhouse 依赖
clickhouse-jdbc-0.2.6.jar
flink-connector-clickhouse-1.13.6.jar
# flink cdc依赖包
flink-sql-connector-mysql-cdc-2.2.1.jar
1.hive 依赖包放置 FLINK_HOME/lib和DINKY_HOME/plugins下
2.hadoop 依赖包放置 $FLINK_HOME/lib下
3.mysql 驱动依赖放置 FLINK_HOME/lib和DINKY_HOME/plugins下
4.clickhouse 依赖放置 FLINK_HOME/lib和DINKY_HOME/plugins下
5.Dinky mysql catalog依赖放置 $FLINK_HOME/lib下
6.flink cdc依赖包放置 FLINK_HOME/lib和DINKY_HOME/plugins下
7.Dinky hadoop依赖包放置$DINKY_HOME/plugins下(网盘或者群公告下载)
以上依赖放入后,重启 Flink 集群和 Dinky。
五、脚本准备
# mysql建表语句
CREATE TABLE bigdata.products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE bigdata.products AUTO_INCREMENT = 101;
INSERT INTO bigdata.products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
ClickHouse 建表语句
drop table test.test_orders;
CREATE TABLE test.test_orders (
order_id Int64 NOT NULL ,
order_date DATETIME NOT NULL,
customer_name String NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id Int64 NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
)
ENGINE = MergeTree()
ORDER BY order_id
PRIMARY KEY order_id;
六、Hive Catalog 作业
SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 60000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =10000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 5 ;
SET restart-strategy.fixed-delay.delay = 30s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 512m;
SET taskmanager.numberOfTaskSlots=2;
SET yarn.application.queue= root.users.flink;
LOAD MODULE hive WITH ('hive-version' = '2.1.1');
CREATE CATALOG qhc_ods_catalog WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-version' = '2.1.1',
'hive-conf-dir' = '/etc/hive/conf',
'hadoop-conf-dir' = '/etc/hadoop/conf'
);
DROP TABLE IF EXISTS qhc_ods_catalog.qhc_ods.ods_orders_src;
CREATE TABLE IF NOT EXISTS qhc_ods_catalog.qhc_ods.ods_orders_src (
`order_id` int COMMENT ''
, `order_date` timestamp(3) COMMENT ''
, `customer_name` string COMMENT ''
, `price` decimal(12,2) COMMENT ''
, `product_id` int COMMENT ''
, `order_status` tinyint COMMENT ''
,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
'connector' = 'mysql-cdc'
,'hostname' = '192.168.0.4'
,'port' = '3306'
,'username' = 'root'
,'password' = '123456'
,'server-time-zone' = 'Asia/Shanghai'
,'scan.incremental.snapshot.enabled' = 'true'
,'scan.startup.mode'='initial'
,'scan.incremental.snapshot.chunk.size' = '20000'
,'heartbeat.interval' = '120s'
,'database-name' = 'bigdata'
,'table-name' = 'orders'
);
DROP TABLE IF EXISTS qhc_ods_catalog.qhc_ods.ods_orders_sink;
CREATE TABLE IF NOT EXISTS qhc_ods_catalog.qhc_ods.ods_orders_sink (
`order_id` BIGINT COMMENT ''
, `order_date` timestamp(3) COMMENT ''
, `customer_name` string COMMENT ''
, `price` decimal(12,5) COMMENT ''
, `product_id` BIGINT COMMENT ''
, `order_status` tinyint COMMENT ''
,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://192.168.0.5:8123',
'username' = 'default',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'test_orders',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
INSERT INTO qhc_ods_catalog.qhc_ods.ods_orders_sink
SELECT * FROM qhc_ods_catalog.qhc_ods.ods_orders_src;
提交 Flink 作业
如下,Flink 任务正常运行。
查看 ClickHouse
七、Dinky MySQL Catalog 作业
DROP TABLE IF EXISTS ods_orders_src;
CREATE TABLE IF NOT EXISTS ods_orders_src (
`order_id` int COMMENT ''
, `order_date` timestamp(3) COMMENT ''
, `customer_name` string COMMENT ''
, `price` decimal(12,2) COMMENT ''
, `product_id` int COMMENT ''
, `order_status` tinyint COMMENT ''
,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
'connector' = 'mysql-cdc'
,'hostname' = '192.168.0.4'
,'port' = '3306'
,'username' = 'root'
,'password' = '123456'
,'server-time-zone' = 'Asia/Shanghai'
,'scan.incremental.snapshot.enabled' = 'true'
,'scan.startup.mode'='initial'
,'scan.incremental.snapshot.chunk.size' = '20000'
,'heartbeat.interval' = '120s'
,'database-name' = 'bigdata'
,'table-name' = 'orders'
);
DROP TABLE IF EXISTS ods_orders_sink;
CREATE TABLE IF NOT EXISTS ods_orders_sink (
`order_id` BIGINT COMMENT ''
, `order_date` timestamp(3) COMMENT ''
, `customer_name` string COMMENT ''
, `price` decimal(12,5) COMMENT ''
, `product_id` BIGINT COMMENT ''
, `order_status` tinyint COMMENT ''
,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://192.168.0.5:8123',
'username' = 'default',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'test_orders',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
创建初始化作业脚本
执行作业后,在 dinky 元数据库查询是否表已经存在。
查看元数据表
每执行一次初始化DDL,将会更新 Flink 的元数据。通过左侧的结构可以看到catalog的表、view、udf等信息。
提交 Flink 作业
在mysql_catalog_cdc_orders作业中,使用 insert 语句。将数据写入 ClickHouse 中。
SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 60000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =10000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 5 ;
SET restart-strategy.fixed-delay.delay = 30s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
SET pipeline.name = mysql_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 512m;
SET taskmanager.numberOfTaskSlots=2;
SET yarn.application.queue= root.users.flink;
INSERT INTO ods_orders_sink
SELECT * FROM ods_orders_src;
源库插入数据
INSERT INTO bigdata.orders
VALUES (default, '2020-07-30 12:12:30', 'lucy', 25.25, 10000, true);
查看 ClickHouse
八、总结
此实践分别通过 Flink 原生的 Hive Catalog 和 Dinky 中的 MySQL Catalog 将 MySQL 源库中的数据由 Flink CDC 同步到 ClickHouse 的本地表。以此可以将 Flink 实时任务和 ClickHouse 的 ETL 任务由 Dinky 实时计算平台统一处理完成。