前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dinky 扩展 ClickHouse 的实践分享

Dinky 扩展 ClickHouse 的实践分享

作者头像
文末丶
发布2023-02-26 14:19:33
1.1K2
发布2023-02-26 14:19:33
举报
文章被收录于专栏:DataLink数据中台

摘要:本文由韩非老师介绍了 Dinky 实时计算平台基于 Flink SQL Connector 并结合 Catalog 来扩展 ClickHouse 的实践分享。内容包括:

  1. 前言
  2. 环境要求
  3. Flink ClickHouse 连接器编译
  4. 所需依赖
  5. 脚本准备
  6. Hive Catalog 作业
  7. Dinky MySQL Catalog 作业
  8. 总结

Tips:历史传送门~

Dinky 实践系列之 Flink Catalog 元数据管理

Dinky实践系列之FlinkCDC整库实时入仓入湖

Dinky FlinkCDC 整库入仓 StarRocks

打造 Flink + StarRocks+ Dinky 的极速统一分析平台

GitHub 地址

https://github.com/DataLinkDC/dlink

https://gitee.com/DataLinkDC/Dinky

欢迎大家关注 Dinky 的发展~

一、前言

当前在大数据领域比较火热的 OLAP 引擎有 Doris 和 ClickHouse。基于 Apache Flink 的实时计算平台 Dinky 目前已经支持 Doris 和 ClickHouse 。在本次集成实践中,将以 Hive Catalog 和 Dinky 中的 MySQL Catalog 为元数据管理,将MySQL 数据写入 ClickHouse。

二、环境要求

软件

版本

CDH

6.2.0

Hadoop

3.0.0-cdh6.2.0

Hive

2.1.1-cdh6.2.0

Flink

1.13.6

Flink CDC

2.2.1

Dinky

0.6.6

MySQL

5.7

ClickHouse

22.2.2.1(单机版)

三、Flink ClickHouse 连接器编译

此连接器包含 Flink1.12、Flink1.13和Flink1.14。下面以 Flink1.13 为例编译connector。

下载 Flink ClickHouse

代码语言:javascript
复制
git clone https://github.com/itinycheng/flink-connector-clickhouse.git
 cd flink-connector-clickhouse/
git checkout -b release-1.13 origin/release-1.13

IDEA 编译 Flink ClickHouse

修改pom。

代码语言:javascript
复制
<version>1.13.2-SNAPSHOT</version>
修改为
<version>1.13.6</version>

<flink.version>1.13.2</flink.version>
修改为
<flink.version>1.13.6</flink.version>

#scala版本根据自身情况修改
<scala.binary.version>2.11</scala.binary.version>
修改为
<scala.binary.version>2.12</scala.binary.version>

修改完成后,即可进行编译。

编译完成后,jar包如下:

四、所需依赖

代码语言:javascript
复制
#hive依赖包
antlr-runtime-3.5.2.jar
hive-exec-2.1.1-cdh6.2.0.jar
libfb303-0.9.3.jar
flink-sql-connector-hive-2.2.0_2.12-1.13.6.jar
hive-site.xml
# hadoop依赖
flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
# Dinky mysql catalog依赖
dlink-catalog-mysql-1.13-0.6.6-SNAPSHOT.jar
# Dinky hadoop依赖
flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar
# mysql 驱动依赖
mysql-connector-java-8.0.21.jar
# clickhouse 依赖
clickhouse-jdbc-0.2.6.jar
flink-connector-clickhouse-1.13.6.jar
# flink cdc依赖包
flink-sql-connector-mysql-cdc-2.2.1.jar

说明

1.hive 依赖包放置 FLINK_HOME/lib和DINKY_HOME/plugins下

2.hadoop 依赖包放置 $FLINK_HOME/lib下

3.mysql 驱动依赖放置 FLINK_HOME/lib和DINKY_HOME/plugins下

4.clickhouse 依赖放置 FLINK_HOME/lib和DINKY_HOME/plugins下

5.Dinky mysql catalog依赖放置 $FLINK_HOME/lib下

6.flink cdc依赖包放置 FLINK_HOME/lib和DINKY_HOME/plugins下

7.Dinky hadoop依赖包放置$DINKY_HOME/plugins下(网盘或者群公告下载)

以上依赖放入后,重启 Flink 集群和 Dinky。

五、脚本准备

MySQL 建表语句

代码语言:javascript
复制
# mysql建表语句
CREATE TABLE bigdata.products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);

ALTER TABLE bigdata.products AUTO_INCREMENT = 101;

INSERT INTO bigdata.products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

ClickHouse 建表语句

代码语言:javascript
复制
drop table test.test_orders;
CREATE TABLE test.test_orders (
  order_id Int64  NOT NULL ,
  order_date DATETIME NOT NULL,
  customer_name String NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id Int64  NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
)
ENGINE = MergeTree()
ORDER BY order_id
PRIMARY KEY order_id;

六、Hive Catalog 作业

创建作业脚本

代码语言:javascript
复制
SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 60000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =10000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 5 ;
SET restart-strategy.fixed-delay.delay = 30s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 512m;
SET taskmanager.numberOfTaskSlots=2;
SET yarn.application.queue= root.users.flink;
LOAD MODULE hive WITH ('hive-version' = '2.1.1');
CREATE CATALOG qhc_ods_catalog WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-version' = '2.1.1',
    'hive-conf-dir' = '/etc/hive/conf',
    'hadoop-conf-dir' = '/etc/hadoop/conf'
);

DROP  TABLE IF EXISTS qhc_ods_catalog.qhc_ods.ods_orders_src;
CREATE  TABLE IF NOT EXISTS qhc_ods_catalog.qhc_ods.ods_orders_src (
    `order_id` int COMMENT ''
    , `order_date` timestamp(3) COMMENT ''
    , `customer_name` string COMMENT ''
    , `price` decimal(12,2) COMMENT ''
    , `product_id` int COMMENT ''
    , `order_status` tinyint COMMENT ''
    ,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
     'connector' = 'mysql-cdc'
    ,'hostname' = '192.168.0.4'
    ,'port' = '3306'
    ,'username' = 'root'
    ,'password' = '123456'
    ,'server-time-zone' = 'Asia/Shanghai'
    ,'scan.incremental.snapshot.enabled' = 'true'
    ,'scan.startup.mode'='initial' 
    ,'scan.incremental.snapshot.chunk.size' = '20000'   
    ,'heartbeat.interval' = '120s' 
    ,'database-name' = 'bigdata'
    ,'table-name' = 'orders'
    );

DROP  TABLE IF EXISTS qhc_ods_catalog.qhc_ods.ods_orders_sink;
CREATE  TABLE IF NOT EXISTS qhc_ods_catalog.qhc_ods.ods_orders_sink (
     `order_id` BIGINT COMMENT ''
    , `order_date` timestamp(3) COMMENT ''
    , `customer_name` string COMMENT ''
    , `price` decimal(12,5) COMMENT ''
    , `product_id` BIGINT COMMENT ''
    , `order_status` tinyint COMMENT ''
    ,PRIMARY KEY(order_id) NOT ENFORCED  
) COMMENT ''
WITH (    
    'connector' = 'clickhouse',
    'url' = 'clickhouse://192.168.0.5:8123',
    'username' = 'default',
    'password' = '123456',
    'database-name' = 'test',
    'table-name' = 'test_orders',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
   );

INSERT INTO  qhc_ods_catalog.qhc_ods.ods_orders_sink
SELECT * FROM qhc_ods_catalog.qhc_ods.ods_orders_src;

提交 Flink 作业

如下,Flink 任务正常运行。

查看 ClickHouse

七、Dinky MySQL Catalog 作业

创建作业脚本

代码语言:javascript
复制
DROP  TABLE IF EXISTS ods_orders_src;
CREATE  TABLE IF NOT EXISTS ods_orders_src (
    `order_id` int COMMENT ''
    , `order_date` timestamp(3) COMMENT ''
    , `customer_name` string COMMENT ''
    , `price` decimal(12,2) COMMENT ''
    , `product_id` int COMMENT ''
    , `order_status` tinyint COMMENT ''
    ,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
     'connector' = 'mysql-cdc'
    ,'hostname' = '192.168.0.4'
    ,'port' = '3306'
    ,'username' = 'root'
    ,'password' = '123456'
    ,'server-time-zone' = 'Asia/Shanghai'
    ,'scan.incremental.snapshot.enabled' = 'true'
    ,'scan.startup.mode'='initial' 
    ,'scan.incremental.snapshot.chunk.size' = '20000'   
    ,'heartbeat.interval' = '120s' 
    ,'database-name' = 'bigdata'
    ,'table-name' = 'orders'
    );

DROP  TABLE IF EXISTS ods_orders_sink;
CREATE  TABLE IF NOT EXISTS ods_orders_sink (
     `order_id` BIGINT COMMENT ''
    , `order_date` timestamp(3) COMMENT ''
    , `customer_name` string COMMENT ''
    , `price` decimal(12,5) COMMENT ''
    , `product_id` BIGINT COMMENT ''
    , `order_status` tinyint COMMENT ''
    ,PRIMARY KEY(order_id) NOT ENFORCED  
) COMMENT ''
WITH (    
    'connector' = 'clickhouse',
    'url' = 'clickhouse://192.168.0.5:8123',
    'username' = 'default',
    'password' = '123456',
    'database-name' = 'test',
    'table-name' = 'test_orders',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
   );

创建初始化作业脚本

执行作业后,在 dinky 元数据库查询是否表已经存在。

查看元数据表

每执行一次初始化DDL,将会更新 Flink 的元数据。通过左侧的结构可以看到catalog的表、view、udf等信息。

提交 Flink 作业

在mysql_catalog_cdc_orders作业中,使用 insert 语句。将数据写入 ClickHouse 中。

代码语言:javascript
复制
SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 60000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =10000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 5 ;
SET restart-strategy.fixed-delay.delay = 30s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
SET pipeline.name = mysql_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 512m;
SET taskmanager.numberOfTaskSlots=2;
SET yarn.application.queue= root.users.flink;

INSERT INTO  ods_orders_sink
SELECT * FROM ods_orders_src;

源库插入数据

代码语言:javascript
复制
INSERT INTO bigdata.orders
VALUES (default, '2020-07-30 12:12:30', 'lucy', 25.25, 10000, true);

查看 ClickHouse

八、总结

此实践分别通过 Flink 原生的 Hive Catalog 和 Dinky 中的 MySQL Catalog 将 MySQL 源库中的数据由 Flink CDC 同步到 ClickHouse 的本地表。以此可以将 Flink 实时任务和 ClickHouse 的 ETL 任务由 Dinky 实时计算平台统一处理完成。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-08-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Dinky开源 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 下载 Flink ClickHouse
  • IDEA 编译 Flink ClickHouse
  • 说明
  • MySQL 建表语句
  • 创建作业脚本
  • 创建作业脚本
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档