
在上一篇Debezium CDC 数据写入 Kafka,为什么需要单分区 文章中,我们将 debezium 采集到的 json 数据实时写入到了 Kafka 中,这样就实现了采集到存储的整个流程。在之前开发的采集程序中,我们只是对一个表进行了解析,这个表中的字段都包含在了采集数据中。
假如我们要对很多的表进行采集,不论是在解析 json 定义类的时候,还是下游程序读取了 kakfa 的数据需要解析的时候,是不是每个表都要自己手动去定义一个实体类去映射,这样是不是太麻烦了。所以我就想到了通过 javassist 来实现自动创建实体类。
在去年的时候,我当时遇到一个解析多个种类json的需求,为了方便我就开发了一个工具类,通过 javassist 自定义实现动态创建实体类,具体实现可以参考这篇文章:我宣布,Java Json再也不用定义实体类了
javassist提供了动态生成class的功能,在项目中首先引入依赖:
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.29.2-GA</version>
</dependency>然后我们就开始关注 javassist 中的核心类:
可以看到,一个类的 class、字段、方法都被抽象成了类,这里我们就可以构造一个生成 class 的函数。
在这里我们将在函数中定义类名、字段和方法,这一些都需要从外部传入。代码如下:
public static CtClass generateClass(String className, String fields, String connector) throws CannotCompileException {
ClassPool pool = ClassPool.getDefault();
// 初始化CtClas
CtClass ctClass = pool.makeClass(className);
for (String field : fields.split(",")) {
// 添加字段
ctClass.addField(CtField.make("private String " + field + ";", ctClass));
}
// 添加toString方法
ctClass.addMethod(CtMethod.make("public String toString() { String result = " + fields.replace(",", "+ \"" + connector + "\" +") + "; return result;}", ctClass));
return ctClass;
}其中在构造 CtField 字段的时候,我考虑因为会有多个字段,所以索性就用逗号分隔,然后在函数内分割之后进行构造。这里当然也可以传入一个字段的 list 直接遍历。至于 CtMethod,这里只构造了一个 toString 方法。
然后我们就开始结合 debezium 采集的变更数据,来看看是否可以动态生成实体类。这里再来复习一下数据格式:

首先我们要明确的是,采集不同的表,像after、before、source等这一层级节点都是一样的,只是 before 和 after 节点是每个表的字段内容,是独一无二的,这一部分也是我们需要自动生成的部分,之前我们定义的实体类如下:
public class DebeziumEvent {
public RowData before;
public RowData after;
public Source source;
public String op;
public long ts_ms;
public long ts_us;
public long ts_ns;
static class RowData {
public int id;
public String name;
public int age;
@SerializedName("updated_at")
public String updatedAt;
}
static class Source {
public String version;
public String connector;
public String name;
public long ts_ms;
public String snapshot;
public String db;
public String sequence;
public long ts_us;
public long ts_ns;
public String table;
public long server_id;
public String gtid;
public String file;
public long pos;
public int row;
public long thread;
public String query;
}
}我们要动态生成的就是就是上面的 RowData。
我们使用 CtClass 创建的时候,需要获取表的这些字段名来创建 CtField,因为这个程序我最后会集成到 Flume 中,所以我有两种方案获取这些字段:
我们先试用第二种方法创建 RowData。
private static CtClass rowData;
static {
rowData = generateClass("RowData", "id,name,age,update_at", "|");
}通过调试,我们就创建了一个 rowData 类。

可以看到这个类有我们指定的四个字段和 toString 方法。

本篇文章解决了 debezium 程序中,解析不同表的 json 数据需要手动创建实体类的问题。在上面的代码中,我们发现其实使用的都是 javassist 的基本知识,通过拆解需求,构造一些必须的参数,就完成了动态创建实体类的实现。下篇文章就会通过反射使用 ctClass 来完成类的加载调用。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。