首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >零帧起手,Debezium 实现 MySQL CDC 有多简单...

零帧起手,Debezium 实现 MySQL CDC 有多简单...

原创
作者头像
叫我阿柒啊
修改2025-07-16 09:52:34
修改2025-07-16 09:52:34
3670
举报

前言

在完成了开启 MySQL 的 binlog 日志、Kafka 的搭建这些准备工作之后,我们终于可以来接着 Debezium 采集接下来的工作。首先要明确的是,debezium 支持多种模式的 MySQL,例如主从复制、高可用集群、多主模式等。我这里使用的是单点模式,是最简单模式的 MySQL。

debezium 的流程就是从数据库中读取变更的日志信息,完成 CDC 后写入 kafka,最后通过一些工具沉淀到下游生产系统中。

之前也接触过不少的采集工具,例如 oracle 的 ogg 和 flink cdc,当然 flink cdc 底层也是基于 debezium 的。今天我们就从最熟悉的 java 开始,看看如何完成一个 debezium cdc 的程序开发。

debezium

这里我们使用最新版本 3.2.0.Final 的 debezium 依赖,首先引入 debezium的依赖。

代码语言:xml
复制
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
 </dependency>

然后针对于不同的连接器引入依赖,这里我们采集的是 MySQL,所以这里就引入 MySQL的连接器。

代码语言:xml
复制
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.1</version>
</dependency>

然后就可以开发一个采集程序。

程序开发

官网给出了一个 debezium 连接 MySQL 的样例代码,代码基于 JDK17:

代码语言:java
复制
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.id", "85744");
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");

// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        }).build()
    ) {
    // Run the engine asynchronously ...
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);

    // Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finishe

在上面代码中,你会发现创建一个 debezium engine 的代码很少,大部分代码都是配置。这个我在前几年学习 flink cdc 就发现了这个问题,各种配置很多,有些配置有很关键,所以学好 debezium 的关键就是研究明白它的这些参数。

至于如何学习这些参数呢?我的建议就是首先参考官网,其次就是分层级记忆。例如engine配置、MySQL connector 配置、kafka 配置等。

配置

例如上面的样例代码中,一共给出了两个部分的配置,分别是 engine 和 MySQL connector。

engine

首先 nameconnector.class 就是定义名称和指定 connector 类型。offset 相关的就是定义读取的偏移量,这样程序才能知道读取到哪里了。

首先 offset.storage 定义了两种偏移量的存储方式:

  1. FileOffsetBackingStore:存储在本地文件中,然后 offset.storage.file.filename 定义存储路径和文件名称
  2. KafkaOffsetBackingStore:存储在kafka中,这样就需要其他三个参数配合:offset.storage.topic、offset.storage.partitions、offset.storage.replication.factor,玩过 kafka 的应该知道这三个是创建 topic 的参数。

在官网代码中使用的是文件存储 offset 的方式,Kafka 的存储方式是 Kafka Connect 模式特有的配置。剩下的配置就是关于 offset 的提交策略的一些配置。

MySQL connector

对于 MySQL connector 的配置,就是一些常见的 MySQL 连接信息,除此之外还要通过两个参数来配置读取的 database 和 table:

  1. database.include.list:指定数据库
  2. table.include.list:指定表

然后就是记录数据库 schema 信息的配置,同样在官方代码中使用的 FileSchemaHistory 来记录 schema,也可以通过 kafka 的方式来存储。

结语

本篇文章主要从官方给出的样例,学习了如何将 debezium 内嵌到我们的 java 程序中。同时也学了一些初级的公共配置,在上面的代码框架中,我们还需要一些更为详细的配置,以及实现一些代码逻辑,才能将数据写入到kafka中,下一篇将实现这一部分。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • debezium
    • 程序开发
  • 配置
    • engine
    • MySQL connector
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档