在完成了开启 MySQL 的 binlog 日志、Kafka 的搭建这些准备工作之后,我们终于可以来接着 Debezium 采集接下来的工作。首先要明确的是,debezium 支持多种模式的 MySQL,例如主从复制、高可用集群、多主模式等。我这里使用的是单点模式,是最简单模式的 MySQL。
debezium 的流程就是从数据库中读取变更的日志信息,完成 CDC 后写入 kafka,最后通过一些工具沉淀到下游生产系统中。
之前也接触过不少的采集工具,例如 oracle 的 ogg 和 flink cdc,当然 flink cdc 底层也是基于 debezium 的。今天我们就从最熟悉的 java 开始,看看如何完成一个 debezium cdc 的程序开发。
这里我们使用最新版本 3.2.0.Final 的 debezium 依赖,首先引入 debezium的依赖。
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
然后针对于不同的连接器引入依赖,这里我们采集的是 MySQL,所以这里就引入 MySQL的连接器。
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.1</version>
</dependency>
然后就可以开发一个采集程序。
官网给出了一个 debezium 连接 MySQL 的样例代码,代码基于 JDK17:
// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.id", "85744");
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");
// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
System.out.println(record);
}).build()
) {
// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
// Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finishe
在上面代码中,你会发现创建一个 debezium engine 的代码很少,大部分代码都是配置。这个我在前几年学习 flink cdc 就发现了这个问题,各种配置很多,有些配置有很关键,所以学好 debezium 的关键就是研究明白它的这些参数。
至于如何学习这些参数呢?我的建议就是首先参考官网,其次就是分层级记忆。例如engine配置、MySQL connector 配置、kafka 配置等。
例如上面的样例代码中,一共给出了两个部分的配置,分别是 engine 和 MySQL connector。
首先 name 和 connector.class 就是定义名称和指定 connector 类型。offset 相关的就是定义读取的偏移量,这样程序才能知道读取到哪里了。
首先 offset.storage 定义了两种偏移量的存储方式:
在官网代码中使用的是文件存储 offset 的方式,Kafka 的存储方式是 Kafka Connect 模式特有的配置。剩下的配置就是关于 offset 的提交策略的一些配置。
对于 MySQL connector 的配置,就是一些常见的 MySQL 连接信息,除此之外还要通过两个参数来配置读取的 database 和 table:
然后就是记录数据库 schema 信息的配置,同样在官方代码中使用的 FileSchemaHistory 来记录 schema,也可以通过 kafka 的方式来存储。
本篇文章主要从官方给出的样例,学习了如何将 debezium 内嵌到我们的 java 程序中。同时也学了一些初级的公共配置,在上面的代码框架中,我们还需要一些更为详细的配置,以及实现一些代码逻辑,才能将数据写入到kafka中,下一篇将实现这一部分。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。