首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >MySQL CDC 实战:Debezium 变更数据格式详解

MySQL CDC 实战:Debezium 变更数据格式详解

原创
作者头像
叫我阿柒啊
发布2025-07-18 23:56:19
发布2025-07-18 23:56:19
2260
举报

前言

在上一篇Debezium 踩坑实录:我在MySQL CDC 遇到的3个问题. 文章中,讲述了我在 debezium 程序中遇到的问题和解决方法,本篇文章就按照主线任务继续往下走,来研究一下如何处理捕获的变更数据。

程序开发

我们通过之前的调试,以及根据自己的需求完善了配置,保证程序只采集 insert、update、delete 中 dml 操作的变更数据,完整配置如下:

代码语言:java
复制
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "cvm");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "root");
props.setProperty("database.password", "123456");
props.setProperty("database.server.id", "85744");
props.setProperty("table.include.list", "debezium.test");
props.setProperty("topic.prefix", "aqi");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "schemahistory.dat");
props.setProperty("converter.schemas.enable", "false");
props.setProperty("include.schema.changes", "false");

数据处理

接下来就开始关注我们的 notifying 数据处理的部分。

代码语言:java
复制
    try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
            .using(props)
            .notifying(record -> {
                System.out.println(record);
            }).build()
    ) {
        // Run the engine asynchronously ...
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);

        for (;;) {}
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
  }
}

那么如何解析采集到的数据呢,关键就在于 record,这是一个 EmbeddedEngineChangeEvent<String, String> 对象,我们可以看一下关于这个类的定义。

这个类一共有四个成员变量,其中我们只需要关注 key 和 value 即可,key 可以理解为采集到变更数据的唯一表示,value 就是采集到的数据变更信息。然后我在 nitifying 打印出来(这种内部函数断点是真不好加)。

可以看到 key 和 value 都是 json,record 的 headers 和 partition 都为空,因为这里不是 Kafka Connect 模式,最后的 destination 是 Kafka Connect 模式下,自动生成的 topic 信息,这里也是印证了 topic 生成规则是:topicPrefix.databaseName.tableName

json格式化

key 的话就一个 id 字段,这里可以先不关注,我们只关注 value 即可,我们复制 value 格式化一下 json,查看其中的字段。

其中json的一级节点是 before、after、source、transaction、op 以及三个类似于时间戳的字段,我觉得 op 字段枚举值比较关键,因为通过 op 我们才能知道执行了什么类型的 sql:

  1. c = create
  2. u = update
  3. d = delete
  4. r = read (applies to only snapshots)

上面是官方文档给出的解释,如上图所示是我执行 insert 语句捕获到的数据,所以 op 为 c,表示新插入的数据。

然后就是 before 和 after,before 是表示变更前的数据,因为 insert 是新插入的数据,所以 before 就为 null,如果是 update 语句的话,before 就是有值的。after 就是变更后的数据,这里包含表中的字段。

至于 source,打眼一看就是之前配置上的东西,包括 debezium 的配置、database、table 的配置等等。如果你看了官网文档,你应该会发现应该还会输出一个 schema 节点,但是这里明显没有输出,因为我在配置中,设置 converter.schemas.enable 为 "false",这样就不会打印表的schema信息了。

结语

本篇文章主要了解到了 debezium 采集到的数据的格式,这样更有利于我们做后续的处理,下一篇文章就会从数据格式化入手,使用第三方的 json 库来解析数据,方便后续的入库。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 程序开发
    • 数据处理
    • json格式化
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档