首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >生产级 CDC 方案:使用 Flume 封装 Debezium 采集 MySQL

生产级 CDC 方案:使用 Flume 封装 Debezium 采集 MySQL

原创
作者头像
叫我阿柒啊
发布2025-07-25 16:14:55
发布2025-07-25 16:14:55
2880
举报

前言

在写了一系列的 Debezium 的文章之后,其实最后还是要落地到生产系统中去。那么,我们如何去设计一个能够便于快速部署和开发的方案。这就让我想到了 Flume,我们将 Debezium 与 Flume 结合,每次当我们采集一个表的的时候,我们就创建一个配置文件,然后通过命令启动一个相应的进程,这样就能通过配置化快速实现多表采集的工作。

那么,如何将 Debezium 与 Flume 结合,就成了本篇文章要重点讲的东西。

程序设计

玩过 Flume 的同学都知道,Flume 主要有四个部分组成的:

  1. source:数据源采集部分
  2. interceptor:拦截器,对 source 采集的数据做处理
  3. channel:连接 source 和 sink 的管道
  4. sink:数据落地部分

在当前这个采集场景中,我们只能自定义一个 Debezium Source 来实现 MySQL 的采集。

依赖

首先我们要引入我们需要的依赖,首先是 flume-core:

代码语言:xml
复制
<dependency>
  <groupId>org.apache.flume</groupId>
  <artifactId>flume-ng-core</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>

然后就是 debezium 采集 MySQL 需要的依赖,这里的 version.debezium 对应的版本是 3.2.0.Final

代码语言:xml
复制
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
 </dependency>

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-connector-mysql</artifactId>
  <version>${version.debezium}</version>
</dependency>

然后使用的是 JDK17,在所有环境准备好了之后,就可以开始开发 DebeziumSource了。

Source 开发

Source 的代码很简单,我们只需要将Debezium 实战:几行代码,实现 MySQL CDC 数据采集 文章中实现的采集程序,提取一些参数之后,嵌入到 Flume Source 的模版中就可以了,实现代码如下:

代码语言:java
复制
public class DebeziumMySQLSource extends AbstractSource implements EventDrivenSource, Configurable {
    private String hostname;
    private String port;
    private String user;
    private String password;
    private String table;
    private String offsetPath;
    private String schemaHistoryPath;

    private ExecutorService executor;
    private DebeziumEngine<ChangeEvent<String, String>> engine;
    private Properties props = new Properties();

    @Override
    public void configure(Context context) {
        hostname = context.getString("mysql.host");
        port = context.getString("mysql.port", "3306");
        user = context.getString("mysql.user");
        password = context.getString("mysql.password");
        table = context.getString("mysql.table");
        offsetPath = context.getString("mysql.offsetPath");
        schemaHistoryPath = context.getString("mysql.schemaHistoryPath");

        props.setProperty("name", "engine");
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        props.setProperty("offset.storage.file.filename", offsetPath);
        props.setProperty("offset.flush.interval.ms", "60000");
        /* begin connector properties */
        props.setProperty("database.hostname", hostname);
        props.setProperty("database.port", port);
        props.setProperty("database.user", user);
        props.setProperty("database.password", password);
        props.setProperty("database.server.id", "85744");
        props.setProperty("table.include.list", table);
        props.setProperty("topic.prefix", "aqi");
        props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
        props.setProperty("schema.history.internal.file.filename", schemaHistoryPath);
        props.setProperty("converter.schemas.enable", "false");
        props.setProperty("include.schema.changes", "false");

    }

    @Override
    public synchronized void start() {
        super.start();
        engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> {
                    String json = record.value();
                    Event event = EventBuilder.withBody(json, StandardCharsets.UTF_8);
                    getChannelProcessor().processEvent(event);
                })
                .build();

        executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
    }

    @Override
    public synchronized void stop() {
        try {
            if (engine != null) {
                engine.close();
            }
            if (executor != null) {
                executor.shutdownNow();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        super.stop();
    }
}

1. interface

在上面的代码中,我们定义了一个 DebeziumMySQLSource,在这里我们继承了 AbstractSource 抽象类,并实现了EventDrivenSourceConfigurable 两个接口。

AbstractSource 是 Source 的子类,Source 是数据进入 Flume 的入口组件,所有 Source 类都需要实现 Source 接口,AbstractSource 已经实现了大部分通用方法,只留下 start() 和 stop() 重写,所以这里我就继承了 AbstractSource。

EventDrivenSource 是一个标记接口,没有任何的方法。它的作用是用来标识我们 DebeziumMySQLSource 是事件驱动型的,而不是轮询型。

2. 配置

Configurable 提供了 configure() ,可以让我们获取外部的参数。我们在这里将 debezium 采集的参数提取了出来,来实现 MySQL 连接、采集 table 的可配置化。同时在 configure 中初始化了 debezium 的配置。

3. 采集逻辑

然后就是在 start() 中实现采集逻辑,这里还是我们之前的代码,只不过在 Flume 中,Event 是数据传输的载体,我们需要将采集的变更数据构造成 Event,然后通过 channelProcessorprocessEvent 方法,将 event 发送到管道中,实现数据的流转。

结语

这样,我们就实现了 Debezium 与 Flume 的结合,实现了一个 Debezium 采集 MySQL 的 source,当我们想要新增一个表的采集时,只需要写一个配置启动一个进程就ok了,下一篇就会写 DebeziumMySQLSource 部署到服务器运行的教程。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 程序设计
    • 依赖
  • Source 开发
    • 1. interface
    • 2. 配置
    • 3. 采集逻辑
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档