
在写了一系列的 Debezium 的文章之后,其实最后还是要落地到生产系统中去。那么,我们如何去设计一个能够便于快速部署和开发的方案。这就让我想到了 Flume,我们将 Debezium 与 Flume 结合,每次当我们采集一个表的的时候,我们就创建一个配置文件,然后通过命令启动一个相应的进程,这样就能通过配置化快速实现多表采集的工作。
那么,如何将 Debezium 与 Flume 结合,就成了本篇文章要重点讲的东西。
玩过 Flume 的同学都知道,Flume 主要有四个部分组成的:
在当前这个采集场景中,我们只能自定义一个 Debezium Source 来实现 MySQL 的采集。
首先我们要引入我们需要的依赖,首先是 flume-core:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>然后就是 debezium 采集 MySQL 需要的依赖,这里的 version.debezium 对应的版本是 3.2.0.Final。
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>然后使用的是 JDK17,在所有环境准备好了之后,就可以开始开发 DebeziumSource了。
Source 的代码很简单,我们只需要将Debezium 实战:几行代码,实现 MySQL CDC 数据采集 文章中实现的采集程序,提取一些参数之后,嵌入到 Flume Source 的模版中就可以了,实现代码如下:
public class DebeziumMySQLSource extends AbstractSource implements EventDrivenSource, Configurable {
private String hostname;
private String port;
private String user;
private String password;
private String table;
private String offsetPath;
private String schemaHistoryPath;
private ExecutorService executor;
private DebeziumEngine<ChangeEvent<String, String>> engine;
private Properties props = new Properties();
@Override
public void configure(Context context) {
hostname = context.getString("mysql.host");
port = context.getString("mysql.port", "3306");
user = context.getString("mysql.user");
password = context.getString("mysql.password");
table = context.getString("mysql.table");
offsetPath = context.getString("mysql.offsetPath");
schemaHistoryPath = context.getString("mysql.schemaHistoryPath");
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetPath);
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", hostname);
props.setProperty("database.port", port);
props.setProperty("database.user", user);
props.setProperty("database.password", password);
props.setProperty("database.server.id", "85744");
props.setProperty("table.include.list", table);
props.setProperty("topic.prefix", "aqi");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", schemaHistoryPath);
props.setProperty("converter.schemas.enable", "false");
props.setProperty("include.schema.changes", "false");
}
@Override
public synchronized void start() {
super.start();
engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
String json = record.value();
Event event = EventBuilder.withBody(json, StandardCharsets.UTF_8);
getChannelProcessor().processEvent(event);
})
.build();
executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
}
@Override
public synchronized void stop() {
try {
if (engine != null) {
engine.close();
}
if (executor != null) {
executor.shutdownNow();
}
} catch (Exception e) {
e.printStackTrace();
}
super.stop();
}
}在上面的代码中,我们定义了一个 DebeziumMySQLSource,在这里我们继承了 AbstractSource 抽象类,并实现了EventDrivenSource 和 Configurable 两个接口。
AbstractSource 是 Source 的子类,Source 是数据进入 Flume 的入口组件,所有 Source 类都需要实现 Source 接口,AbstractSource 已经实现了大部分通用方法,只留下 start() 和 stop() 重写,所以这里我就继承了 AbstractSource。
EventDrivenSource 是一个标记接口,没有任何的方法。它的作用是用来标识我们 DebeziumMySQLSource 是事件驱动型的,而不是轮询型。

Configurable 提供了 configure() ,可以让我们获取外部的参数。我们在这里将 debezium 采集的参数提取了出来,来实现 MySQL 连接、采集 table 的可配置化。同时在 configure 中初始化了 debezium 的配置。

然后就是在 start() 中实现采集逻辑,这里还是我们之前的代码,只不过在 Flume 中,Event 是数据传输的载体,我们需要将采集的变更数据构造成 Event,然后通过 channelProcessor 的 processEvent 方法,将 event 发送到管道中,实现数据的流转。
这样,我们就实现了 Debezium 与 Flume 的结合,实现了一个 Debezium 采集 MySQL 的 source,当我们想要新增一个表的采集时,只需要写一个配置启动一个进程就ok了,下一篇就会写 DebeziumMySQLSource 部署到服务器运行的教程。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。