在上一篇Debezium 实战:使用 Gson 解析 CDC 变更数据 文章中,我们对 debezium 采集到的 json 数据进行解析,通过对三种操作类型的分析,了解到了数据的结构和含义。那么,本篇文章就是对数据进一步的处理,然后写入到我们之前搭建的 Kafka 中。
docker 一键安装 KRaft 模式的 Kafka 脚本可以参考本篇文章:MySQL Binlog 实时同步 Kafka:Debezium 实战笔记
在程序中引入 kafka 3.9.0 的依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
</dependency>
初始化一个 KafkaProducer 对象,用于写入 Kafka 中。
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
上面是一个最简单的配置,只配置了 bootstrap servers 和 序列化方式,像小吞吐量也不需要配置高级的写入参数,例如 linger.ms 和 buffer.size 等。然后我们就可以在 notifying 中构造 ProducerRecord 对象,启动程序就可以将数据写入到 Kafka 中。
ProducerRecord<String, String> msg = new ProducerRecord<>("aqi", record.value());
producer.send(msg);
接着我们在云服务器上执行命令创建 topic。
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic aqi
这样在 Kafka 中就创建了一个单分区、单副本的 topic。
因为使用的是单节点的 Kafka,所以这里创建再多的副本都只会在这一个基点上,没有什么意义。至于单分区,是因为要保证数据的顺序性,假如你有两个分区的话,在很短的时间间隔采集了同一条数据的两次 update,假如这两条数据分别写入到两个分区,在某些情况下就会出现第二个 update 的数据就会出现在第一个 update 的前面。
所以最简单的方式就是创建单分区保证顺序性,如果单分区吞吐量不够非要使用多分区,就只能从下游程序去做处理。例如在flink的程序中,可以通过 watermark 的方式,等待出现的消费延迟的数据,然后再窗口内根据采集时间来进行排序。
在程序启动之后,在 MySQL 中执行了几个 insert 语句,然后我们执行消费命令,看看是否已经有数据写入:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic aqi --from-beginning
可以看到已经消费到了 debezium 采集到的数据,也就是说已经成功写入到了 Kafka 中。
本篇文章完成了 cdc 采集数据写入 Kafka 的过程。如果对吞吐量要求不是很大的情况下,建议使用单分区的 topic 完成数据的接入,这样对于下游的 cdc 还原业务是极度友好的。下一篇文章将开始对程序进行优化,并与 flume 进行集成,可以不修改代码实现不同表的定制化开发。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。