各位看官老爷,见字如面~
最近在给社区同学和商业化客户沟通交流时,发现很多同学初步接触或者学习时,遇到了不少入门时的卡点 —— 数据同步。
经常遇到想做 POC 测试,但是 Doris 数据库表的构建,对自己而言又是一个有门槛的学习过程。
同时堆积着自己的各种现有开发任务和其他工作的压力,很多时候虽然想玩一玩 Doris,或者想和已有的 OLTP 库对比一下查询上是否真的有很高的性能提升,骤然感觉鸭力山大,最后不得已而放弃……
为了防止大家勇于放弃,同时减少自我摸索的痛苦,本篇就来一起探讨一下,如何快速将业务 OLTP 库的库表结构,同步至 Apache Doris 中来。
话不多说,开搞!
本次演示环境:内网环境,无防火墙和端口开放等干扰项。
适用源库:MySQL、Oracle、PostgreSQL、SQLServer、DB2、MongoDB
适用目标库:Apache Doris 社区版、SelectDB Doris 分发版
使用工具:单节点 Flink 以及 Flink 集群
命令执行:Shell 终端、Dbeaver、SelectDB-WebUI
本方案借助 Flink 社区完善的 Source 生态,以及功能丰富强大的 Flink-Doris-Connector 组件来联合完成。
方案适用于以下几种应用情况:
本篇章不做数据实时同步演示,只做库表结构同步演示,若要做数据同步,可参考 Doris 官网 Flink-Doris-Connector 章节。
后文将给出使用 JDBC-Catalog 从外表 INSERT INTO SELECT 至内表的方式完成数据初始化同步案例。
Flink 组件版本:1.16.3
在 Flink 官网 下载相应的压缩包至内网下任意一台节点部署即可
# 下载 Flink 二进制包
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
# 解压缩
tar -zxvf flink-1.16.-bin-scala_2.12.tgz
备注
$FLINK_HOME/lib
目录下添加对应的 Flink CDC 依赖,可至 Maven 中央仓库 搜索下载,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar 等$FLINK_HOME/lib
下增加相关的 JDBC 驱动。# 进入 Flink 服务 lib 目录
cd flink-1.16./lib
# 下载相应 Flink CDC 依赖
# MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
# Doris
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.16/24.1.0/flink-doris-connector-1.16-24.1.0.jar
MySQL Server 版本:8.0.40
MySQL JDBC 版本:8.0.28
Flink MySQL Connector 版本:3.2.1
Flink Doris Connector 版本:1.16-24.1.0
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.1.0.jar \
mysql-sync-database \
--database mysql_ssb \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf port=3306 \
--mysql-conf username=admin \
--mysql-conf password=123456 \
--mysql-conf database-name=ssb \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--create-table-only "true" \
--table-conf replication_num=3
# 使用 mysql_ssb 数据库
use mysql_ssb;
# 查看 tables
show tables;
java.lang.NoClassDefFoundError: org/apache/flink/cdc/debezium/DebeziumDeserializationSchema
at org.apache.doris.flink.tools.cdc.CdcTools.createMySQLSyncDatabase(CdcTools.java:)
at org.apache.doris.flink.tools.cdc.CdcTools.main(CdcTools.java:)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:)
at java.base/java.lang.reflect.Method.invoke(Method.java:)
at
请检查是否使用了 Flink-Doris-Connector 对应的 CDC jar 包,若使用了 24.0 以上的 Flink-Doris-Connector,请使用 3.1 以后的 SQL CDC Connector。
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No tables to be synchronized. Please make sure whether the tables that need to be synchronized exist in the corresponding database or schema.
该问题为诸如 --including-tables
等 mysql
端参数异常,无法捕捉到对应的表,请检查对应参数信息。
--excluding-tables
参数来做排除,如使用中文字段或其他非法字符构建的表结构,可使用 |
分隔多个表,并支持正则表达式。比如 --excluding-tables table1
使用 Doris 的 JDBC-Catalog 功能,可简单快捷的同步主流 OLTP 库的数据至 Doris 指定表中,我们接着使用上述环境完成构建。
将 MySQL-JDBC 驱动包下载同步至所有 Doris 节点下的 jdbc_drivers
目录中,该目录默认在 DORIS_HOME
根目录下。
本案例环境为 1FE 3BE 集群
# 跳至 FE 节点
ssh fe-
# 进入 DORIS_HOME 下的 jdbc_drivers 目录,案例中 DORIS_HOME 路径为 /home/doris/
cd /home/doris/jdbc_drivers
# 下载 MySQL JDBC Driver,要求最好 8.0.31 及以后版本
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.3.0/mysql-connector-j-8.3.0.jar
# SCP 分发至其他所有节点,以 BE-01 节点为例
scp root@fe-:/home/doris/jdbc_drivers/mysql-connector-j-8.3.0.jar root@be-:/home/doris/jdbc_drivers
CREATE CATALOG mysql PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="secret",
"jdbc_url"="jdbc:mysql://example.net:3306",
"driver_url"="mysql-connector-j-8.3.0.jar",
"driver_class"="com.mysql.cj.jdbc.Driver"
)
执行成功后可使用 switch mysql
命令,或者在 SelectDB-WebUI 上观察到外表数据库的层级结构。
在 MySQL 端我们往 customer
表中插入 3 条数据作为示例:
INSERT INTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(,'1','1','1','1','1','1','1');
INSERTINTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(,'2','2','2','2','2','2','2');
INSERTINTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(,'3','3','3','3','3','3','3');
连接 Doris 客户端,执行外表写入内表命令:
INSERT INTO mysql_ssb.customer
SELECT * FROM mysql.ssb.customer;
可以看到数据已成功同步至 Doris 中。
USE database
,内表表名指定格式为 database_name.table_name
,外表表名指定格式为 catalog_name.database_name.table_name
,以上述案例为例,catalog_name
为 mysql
,故此整体的查询语句的 From 后指定的表名限定名为 mysql.ssb.customer
,而内表表名为 mysql_ssb.customer
,此处易混淆,请仔细甄别,也可将 MySQL 外表 Catalog 名设置为其他易分辨的名称。按如上流程,MySQL 等主流 OLTP 库的库表结构同步工作将无需再进行大量的人工转义工作,基本的字段映射、表类型映射等工作,都可以使用该方案快速满足,在此特别感谢我迪哥 吴迪(JNSimba)三年的倾情贡献!
本文分享自 Apache Doris 补习班 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!