首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >主流 OLTP 库表结构快速同步至 Apache Doris

主流 OLTP 库表结构快速同步至 Apache Doris

作者头像
苏奕嘉
发布2025-07-14 18:28:58
发布2025-07-14 18:28:58
14300
代码可运行
举报
运行总次数:0
代码可运行

引言

各位看官老爷,见字如面~

最近在给社区同学和商业化客户沟通交流时,发现很多同学初步接触或者学习时,遇到了不少入门时的卡点 —— 数据同步。

经常遇到想做 POC 测试,但是 Doris 数据库表的构建,对自己而言又是一个有门槛的学习过程。

同时堆积着自己的各种现有开发任务和其他工作的压力,很多时候虽然想玩一玩 Doris,或者想和已有的 OLTP 库对比一下查询上是否真的有很高的性能提升,骤然感觉鸭力山大,最后不得已而放弃……

为了防止大家勇于放弃,同时减少自我摸索的痛苦,本篇就来一起探讨一下,如何快速将业务 OLTP 库的库表结构,同步至 Apache Doris 中来。

话不多说,开搞!

适用场景

本次演示环境:内网环境,无防火和端口开放等干扰项。

适用源库:MySQL、Oracle、PostgreSQL、SQLServer、DB2、MongoDB

适用目标库:Apache Doris 社区版、SelectDB Doris 分发版

使用工具:单节点 Flink 以及 Flink 集群

命令执行:Shell 终端、Dbeaver、SelectDB-WebUI

方案说明

本方案借助 Flink 社区完善的 Source 生态,以及功能丰富强大的 Flink-Doris-Connector 组件来联合完成。

图片
图片

方案适用于以下几种应用情况:

  1. 1. 源库有大量的库表结构需要批量同步,需要一个快速的工具
  2. 2. 对 Doris 本身的库表结构构建熟悉程度低,希望先快速搭建一个可测试环境,在后续学习中逐步完善表结构
  3. 3. 对 Doris 本身的库表结构熟悉程度高,希望快速做完库表结构转换后,自行微调参数、类型以贴合自己业务达到最佳实践效果

本篇章不做数据实时同步演示,只做库表结构同步演示,若要做数据同步,可参考 Doris 官网 Flink-Doris-Connector 章节。

后文将给出使用 JDBC-Catalog 从外表 INSERT INTO SELECT 至内表的方式完成数据初始化同步案例。

MySQL 实操示例

Flink 组件快速部署

Flink 组件版本:1.16.3

在 Flink 官网 下载相应的压缩包至内网下任意一台节点部署即可

代码语言:javascript
代码运行次数:0
运行
复制
# 下载 Flink 二进制包
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
# 解压缩
tar -zxvf flink-1.16.-bin-scala_2.12.tgz

备注

  1. 1. 同步时需要在 $FLINK_HOME/lib 目录下添加对应的 Flink CDC 依赖,可至 Maven 中央仓库 搜索下载,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar 等
  2. 2. Connector 24.0.0 之后依赖的 Flink CDC 版本需要在 3.1 以上,如果需使用 Flink CDC 同步 MySQL 和 Oracle,还需要在 $FLINK_HOME/lib 下增加相关的 JDBC 驱动。
代码语言:javascript
代码运行次数:0
运行
复制
# 进入 Flink 服务 lib 目录
cd flink-1.16./lib
# 下载相应 Flink CDC 依赖
# MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
# Doris
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.16/24.1.0/flink-doris-connector-1.16-24.1.0.jar
图片
图片

MySQL 快速同步

MySQL Server 版本:8.0.40

MySQL JDBC 版本:8.0.28

Flink MySQL Connector 版本:3.2.1

Flink Doris Connector 版本:1.16-24.1.0

使用 SSB 库表初始化 MySQL
图片
图片
配置和执行 Flink JOB 任务
代码语言:javascript
代码运行次数:0
运行
复制
<FLINK_HOME>bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-24.1.0.jar \
    mysql-sync-database \
    --database mysql_ssb \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf port=3306 \
    --mysql-conf username=admin \
    --mysql-conf password=123456 \
    --mysql-conf database-name=ssb \
    --sink-conf fenodes=127.0.0.1:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
    --sink-conf sink.label-prefix=label \
    --create-table-only "true" \
    --table-conf replication_num=3
图片
图片
检查 Doris 同步情况
代码语言:javascript
代码运行次数:0
运行
复制
# 使用 mysql_ssb 数据库
use mysql_ssb;
# 查看 tables
show tables;
图片
图片

Flink 任务常见问题

  1. 1. Flink 任务执行失败,提示如下信息:
代码语言:javascript
代码运行次数:0
运行
复制
java.lang.NoClassDefFoundError: org/apache/flink/cdc/debezium/DebeziumDeserializationSchema
           at org.apache.doris.flink.tools.cdc.CdcTools.createMySQLSyncDatabase(CdcTools.java:)
           at org.apache.doris.flink.tools.cdc.CdcTools.main(CdcTools.java:)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:)
           at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:)
           at java.base/java.lang.reflect.Method.invoke(Method.java:)
           at 

请检查是否使用了 Flink-Doris-Connector 对应的 CDC jar 包,若使用了 24.0 以上的 Flink-Doris-Connector,请使用 3.1 以后的 SQL CDC Connector。

  1. 1. Flink 构建任务失败,提示如下信息:
代码语言:javascript
代码运行次数:0
运行
复制
   The program finished with the following exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No tables to be synchronized. Please make sure whether the tables that need to be synchronized exist in the corresponding database or schema.

该问题为诸如 --including-tables 等 mysql 端参数异常,无法捕捉到对应的表,请检查对应参数信息。

  1. 1. 整库同步时出现部分表名、字段名解析异常,无法正常同步:请使用 --excluding-tables 参数来做排除,如使用中文字段或其他非法字符构建的表结构,可使用 | 分隔多个表,并支持正则表达式。比如 --excluding-tables table1

INSERT 数据同步

使用 Doris 的 JDBC-Catalog 功能,可简单快捷的同步主流 OLTP 库的数据至 Doris 指定表中,我们接着使用上述环境完成构建。

环境配置

将 MySQL-JDBC 驱动包下载同步至所有 Doris 节点下的 jdbc_drivers 目录中,该目录默认在 DORIS_HOME 根目录下。

本案例环境为 1FE 3BE 集群

代码语言:javascript
代码运行次数:0
运行
复制
# 跳至 FE 节点
ssh fe-
# 进入 DORIS_HOME 下的 jdbc_drivers 目录,案例中 DORIS_HOME 路径为 /home/doris/
cd /home/doris/jdbc_drivers
# 下载 MySQL JDBC Driver,要求最好 8.0.31 及以后版本
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.3.0/mysql-connector-j-8.3.0.jar
# SCP 分发至其他所有节点,以 BE-01 节点为例
scp root@fe-:/home/doris/jdbc_drivers/mysql-connector-j-8.3.0.jar root@be-:/home/doris/jdbc_drivers
图片
图片
构建 MySQL JDBC-Catalog
代码语言:javascript
代码运行次数:0
运行
复制
CREATE CATALOG mysql PROPERTIES (
"type"="jdbc",
"user"="root",
"password"="secret",
"jdbc_url"="jdbc:mysql://example.net:3306",
"driver_url"="mysql-connector-j-8.3.0.jar",
"driver_class"="com.mysql.cj.jdbc.Driver"
)
图片
图片

执行成功后可使用 switch mysql 命令,或者在 SelectDB-WebUI 上观察到外表数据库的层级结构。

同步数据
MySQL 插入初始数据

在 MySQL 端我们往 customer 表中插入 3 条数据作为示例:

代码语言:javascript
代码运行次数:0
运行
复制
INSERT INTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(,'1','1','1','1','1','1','1');
INSERTINTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(,'2','2','2','2','2','2','2');
INSERTINTO ssb.customer
(c_custkey, c_name, c_address, c_city, c_nation, c_region, c_phone, c_mktsegment)
VALUES(,'3','3','3','3','3','3','3');
图片
图片
执行同步语句

连接 Doris 客户端,执行外表写入内表命令:

代码语言:javascript
代码运行次数:0
运行
复制
INSERT INTO mysql_ssb.customer
SELECT * FROM mysql.ssb.customer;
图片
图片

可以看到数据已成功同步至 Doris 中。

注意事项
  1. 1. 经过测试,JDBC-Catalog 同步速度大致在 10-30MB/s,若需要进行大规模数据同步(TB级),可以使用其他方式,如 Datax、Spark、StreamLoader 等工具并发导入。
  2. 2. 查询时若未 USE database ,内表表名指定格式为 database_name.table_name ,外表表名指定格式为 catalog_name.database_name.table_name ,以上述案例为例,catalog_name 为 mysql,故此整体的查询语句的 From 后指定的表名限定名为 mysql.ssb.customer,而内表表名为 mysql_ssb.customer此处易混淆,请仔细甄别,也可将 MySQL 外表 Catalog 名设置为其他易分辨的名称。

小结

按如上流程,MySQL 等主流 OLTP 库的库表结构同步工作将无需再进行大量的人工转义工作,基本的字段映射、表类型映射等工作,都可以使用该方案快速满足,在此特别感谢我迪哥 吴迪(JNSimba)三年的倾情贡献!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-12-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Apache Doris 补习班 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • 适用场景
  • 方案说明
  • MySQL 实操示例
    • Flink 组件快速部署
    • MySQL 快速同步
      • 使用 SSB 库表初始化 MySQL
      • 配置和执行 Flink JOB 任务
      • 检查 Doris 同步情况
    • Flink 任务常见问题
    • INSERT 数据同步
      • 环境配置
      • 构建 MySQL JDBC-Catalog
      • 同步数据
      • MySQL 插入初始数据
      • 执行同步语句
      • 注意事项
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档