首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Debezium CDC 数据写入 Kafka,为什么需要单分区...

Debezium CDC 数据写入 Kafka,为什么需要单分区...

原创
作者头像
叫我阿柒啊
发布2025-07-21 14:46:23
发布2025-07-21 14:46:23
3070
举报

前言

在上一篇Debezium 实战:使用 Gson 解析 CDC 变更数据 文章中,我们对 debezium 采集到的 json 数据进行解析,通过对三种操作类型的分析,了解到了数据的结构和含义。那么,本篇文章就是对数据进一步的处理,然后写入到我们之前搭建的 Kafka 中。

docker 一键安装 KRaft 模式的 Kafka 脚本可以参考本篇文章:MySQL Binlog 实时同步 Kafka:Debezium 实战笔记

程序开发

在程序中引入 kafka 3.9.0 的依赖:

代码语言:xml
复制
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.9.0</version>
</dependency>

初始化一个 KafkaProducer 对象,用于写入 Kafka 中。

代码语言:java
复制
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);

上面是一个最简单的配置,只配置了 bootstrap servers 和 序列化方式,像小吞吐量也不需要配置高级的写入参数,例如 linger.msbuffer.size 等。然后我们就可以在 notifying 中构造 ProducerRecord 对象,启动程序就可以将数据写入到 Kafka 中。

代码语言:java
复制
ProducerRecord<String, String> msg = new ProducerRecord<>("aqi", record.value());
producer.send(msg);

Kafka 操作

接着我们在云服务器上执行命令创建 topic。

代码语言:bash
复制
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic aqi

这样在 Kafka 中就创建了一个单分区、单副本的 topic。

因为使用的是单节点的 Kafka,所以这里创建再多的副本都只会在这一个基点上,没有什么意义。至于单分区,是因为要保证数据的顺序性,假如你有两个分区的话,在很短的时间间隔采集了同一条数据的两次 update,假如这两条数据分别写入到两个分区,在某些情况下就会出现第二个 update 的数据就会出现在第一个 update 的前面。

  1. 采集写入的时候,因为线程繁忙或者网络抖动
  2. 下游数据消费的时候,无法保证分区间的顺序性

所以最简单的方式就是创建单分区保证顺序性,如果单分区吞吐量不够非要使用多分区,就只能从下游程序去做处理。例如在flink的程序中,可以通过 watermark 的方式,等待出现的消费延迟的数据,然后再窗口内根据采集时间来进行排序。

在程序启动之后,在 MySQL 中执行了几个 insert 语句,然后我们执行消费命令,看看是否已经有数据写入:

代码语言:bash
复制
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic aqi --from-beginning

可以看到已经消费到了 debezium 采集到的数据,也就是说已经成功写入到了 Kafka 中。

结语

本篇文章完成了 cdc 采集数据写入 Kafka 的过程。如果对吞吐量要求不是很大的情况下,建议使用单分区的 topic 完成数据的接入,这样对于下游的 cdc 还原业务是极度友好的。下一篇文章将开始对程序进行优化,并与 flume 进行集成,可以不修改代码实现不同表的定制化开发。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 程序开发
  • Kafka 操作
  • 结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档