首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Debezium 实战:使用 Gson 解析 CDC 变更数据

Debezium 实战:使用 Gson 解析 CDC 变更数据

原创
作者头像
叫我阿柒啊
发布2025-07-20 23:54:12
发布2025-07-20 23:54:12
3130
举报

前言

在上一篇MySQL CDC 实战:Debezium 变更数据格式详解 文章中,探究了一番 debezium 采集到的变更数据格式。在原始格式的基础上,我们可以只去其中的部分有效字段,构建成一个新的 json 数据写入到 Kafka 中。

解析

首先我们需要解析采集到的变更数据,这里我选择 gson 解析数据,在 maven 中我引入 gson 依赖:

代码语言:xml
复制
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.10.1</version>
</dependency>

然后我们回顾一下之前采集到的数据格式,明确一下数据字段。

然后根据上面的字段和层级,我们定义实体类来完成字段映射:

代码语言:java
复制
public class DebeziumEvent {
    public RowData before;
    public RowData after;
    public Source source;
    public String op;
    public long ts_ms;
    public long ts_us;
    public long ts_ns;

    static class RowData {
        public int id;
        public String name;
        public int age;

        @SerializedName("updated_at")
        public String updatedAt;
    }

    static class Source {
        public String version;
        public String connector;
        public String name;
        public long ts_ms;
        public String snapshot;
        public String db;
        public String sequence;
        public long ts_us;
        public long ts_ns;
        public String table;
        public long server_id;
        public String gtid;
        public String file;
        public long pos;
        public int row;
        public long thread;
        public String query;
    }

我们定义了 DebeziumEvent 主类和几个静态内部类,其中 RowData 这个类的属性,是会随着采集的表结构改变而改变的。所以后续会在这里通过反射来实现这个类的动态创建。

然后就是在 notifying 中使用 gson 完成解析,代码如下:

代码语言:java
复制
.notifying(record -> {
  Gson gson = new Gson();
  DebeziumEvent event = gson.fromJson(record.value(), DebeziumEvent.class);
  System.out.println(record);
  System.out.println("操作类型: " + event.op);
  System.out.println("数据库: " + event.source.db);
  System.out.println("表名: " + event.source.table);
  System.out.println("变更前 ID: " + (event.before != null ? event.before.id : "null"));
  System.out.println("变更后 ID: " + (event.after != null ? event.after.id : "null"));
}).build()

结果验证

当插入一条数据之后,在控制台我们就看到了采集到的 op 为 c 的变更数据。

在采集的插入数据中,我们可以看到变更前的id为null,但是可以看到新的id。当我们删除了一条数据之后,我们在看控制台打印的采集数据。

可以看到删除触发的变更数据里面,变更前有值,变更后为null,因为已经删除了。所以,到这里聪明的你应该想到了,当采集更新触发的变更数据时,变更前变更后都会有值。当我执行 update 的时候,结果如下:

问题

在测试的时候,我遇到了一个问题,就是每次采集 delete 的变更数据的时候,就提示我线程池被关闭了。刚开始我还不知道什么原因,后来在 notifying 中添加了 try-catch 之后,才发现

event.op 是空指针异常,也就是说我采集的数据变更数据 record 的 value 为null。

后来我就打印出来了,确实执行了一个 delete 语句,结果采集到了两条数据。一个是正常的采集的变更数据,一个是只有 key 有值,value 为 null 的数据,如下图:

后来我搜索了一下,debezium 默认会为 DELETE 操作生成一条 数据变更消息 + 一条 Tombstone 消息(墓碑消息)。tombstone 消息用于 Kafka 流式处理场景,表示 "此键对应的数据已删除"。所以在处理的时候,需要对 event 为 null 进行过滤处理。

结语

本篇文章主要定义了一个实体类去解析采集到的数据,并解决了在采集过程遇到的一些问题,从容优化了我们的处理逻辑,在下篇文章中,我们将会完成 Kafka 的写入,实现 debezium 与 Kafka 的交互。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 解析
  • 结果验证
  • 问题
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档