首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Debezium和Quarkus:通过CDC模式来避免双重写入

在本系列文章的第2部分,我们学习了 Kafka Streams 与 Quarkus 的集成。我们开发了一个简单的应用程序向一个 Kafka 主题生成事件,并使用 Kafka Streams 实时消费和处理这些事件。

在那个示例中,我们模拟了一家电影流媒体公司。我们将电影信息保存在一个 Kafka 主题中,当用户停止观看电影,我们将这个事件和已播放的时间保存到另一个主题中。我们对这些事件进行实时的后续处理,计算出一部电影播放超过 10 分钟的次数。

应用程序的架构如下图所示。

所有的信息都保存在 Kafka 主题中,但这在现实的项目中是不太可能发生的。

在现实当中,电影信息可能保存在传统的数据库中,并用一些分布式缓存来加快查询速度,或者使用搜索引擎建立索引。为简单起见,我们假设电影信息保存在数据库中。

这就提出了一个问题——我们如何在两个不同的系统中维护相同的数据,数据库作为主要保存数据的位置,Kafka 主题中的数据将使用 Kafka Streams 来处理。

本文将教你如何正确地以不同形式保存相同的数据。

双重写入

要解决这个问题,我们首先想到的可能是双重写入。这是一种最为简单的方法,应用程序负责维护所有位置的数据。例如,当有新的电影信息需要插入时,会执行一个数据库插入,并发送一个事件到 Kafka 主题。

代码可能像下面这样。

代码语言:javascript
复制
@Channel("movies")
Emitter<Record<Long, String>> movieEmitter;
 
private static ObjectMapper objectMapper = new ObjectMapper();
 
public Movie dualWriteInsert(Movie movie) throws JsonProcessingException {
    // Inserts to DB
    movie.persist();
 
    // Send an event to movies topic
    final String payloadJson = objectMapper.writeValueAsString(movie);
    long id = movie.id;
 
    movieEmitter.send(Record.of(id, payloadJson));

这看起来没什么问题,很容易实现,也很有效,如果没有遇到什么奇怪的问题的话。下面让我们来看看这种方式会遇到怎样的问题。

  • 如果数据被持久化在数据库中,但发送到 Kafka 主题时失败了,你可以把这两个操作包装在一个事务块中。这可以解决事务问题,因为在出错时可以回滚。但你在性能方面付出了巨大的代价,事务范围越大,阻塞数据库的时间就越长。
  • 如果两个并发用户想同时更新一个电影信息,会发生什么情况?可能会发生这样的情况——第一个请求更新了数据库,并将事件发送给 Kafka,然后第二个请求再次更新数据库和 Kafka。在这种情况下,数据库和 Kafka 主题中的数据是对齐的。但是如果第一个请求只将数据持久化到数据库,第二个请求持久化并发送事件到 Kafka,然后第一个请求再将事件发送到 Kafka 主题,那么此时数据库和 Kafka 主题中的数据就发生了分歧,产生了不同的值,导致数据之间不一致。当然,你可以使用同步方法,但这将意味着巨大的性能损失。出现第二个问题是因为混合使用了不同的系统,数据库事务的保证范围只限于数据库本身,无法在不同的系统之间起作用。

两阶段提交

这个问题的一个可能的解决方案是使用两阶段提交协议。虽然这可能是一个很好的解决方案,但也存在两个问题。

  • 首先,并不是所有的系统都支持分布式事务和两阶段提交。
  • 这个协议的问题在于各方之间的通信需要进行额外的协调。这是一个可能的解决方案,但它不是一个通用的解决方案。在我们的示例中,Kafka 不支持分布式事务,所以让我们来看看另一个解决方案。

变更数据捕获

变更数据捕获(Change Data CaptureCDC)是一种模式,用于跟踪已更改的数据(例如,添加的新记录、更新的注册表等)并触发事件,让应用程序能够对变更作出反应。

有几种实现 CDC 的方法,例如,在使用行级别的时间戳、版本号或状态指示器,这样就可以定时从一个特定的点检查数据(例如,SELECT * WHERE status=not_read)。但这种方法有一个缺点,你需要经常访问数据库,但这些访问与业务无关,而且需要处理数据被删除的情况。

另一种方法是使用数据库触发器,即任何一个数据变更都会触发一个事件,并将事件保存在特定的事件表中。你可以捕获任何一个事件,但仍然需要定时轮询数据库。

大多数数据库都有事务日志,它记录了数据库的所有变更。日志扫描器会扫描这个日志,并以非侵入式的方式捕获变更。这种方法的好处如下所示。

  • 对数据库的影响最小。
  • 变更对应用程序来说是透明的,不需要插入特殊的列。
  • 事务完整性。
  • 不需要修改数据库 Schema。日志扫描是最好的方法,而Debezium是最流行的开源日志扫描器项目。

Debezium

Debezium 是一个通过扫描日志实现变更数据捕获的开源项目。启动数据库并配置 Debezium,用它消费数据库事务日志中的数据。对于提交给数据库的每一次插入、删除或更新,Debezium 都将触发一个事件,应用程序可以向它注册并作出相应的反应。

那么为什么说 Debezium、CDC 和 Kafka 可以帮助我们解决双重写入的问题呢?Kafka 主题由一个或多个分区组成,每个分区按照事件到达的顺序对事件进行排序(事件总是被追加到分区的末尾)。因此,如果我们想要维护并发操作的顺序问题(避免在系统之间有错位的数据),Kafka 主题可以帮我们解决这个问题。

当然,还有另外一个问题,即在并发操作的情况下,如何按照正确的顺序从数据库中读取数据。CDC 和日志扫描器可以确保事务提交后数据的顺序是正确的,并且是非侵入性的,而 Debezium 可以在这方面发挥作用。

你可以用两种不同的方式来操作 Debezium,这两种方式都是有效的,使用哪一种取决于具体情况。这两种方式分别是 Debezium 服务器或 Debezium 引擎(嵌入式)。

Debezium 服务器

Debezium 服务器将 Debezium 作为 Kafka Connect 实例运行。Kafka Connect 是一个独立的进程,由消费者和生产者启动,用于从 Kafka 读取数据。Kafka Connect 定义了不同数据系统的连接器,然后将大型数据集移入或移出 Kafka。由于连接器使用了 Kafka API,所以它们是可伸缩的,具有容错能力和较低的延迟。

在下面的例子中,假设你想将数据从一个 Kafka 主题导出到一个索引引擎,比如 ElasticSearch。你有两个选择。

使用 Kafka API 创建一个应用程序(就像我们在本系列的第1部分中看到的那样)从 Kafka 主题读取事件,然后使用 ElasticSearch 客户端将数据填充到索引中。

使用 ElasticSearch Kafka Connect,它已经实现了所有这些逻辑,你只需要配置和启动即可。

Debezium 做的是同样的事情,它从数据库读取事务日志,并将其发送到 Kafka 主题。

Debezium 最大的优点之一是它可以连接到多种数据库,如 MySQL、MongoDB、PostgreSQL、Oracle DB、SQL Server、DB 2、Cassandra 和 Vitesse。

Debezium 引擎

通常情况下我们会使用 Debezium 服务器,因为它不会干扰应用程序。它是一个用于接收数据变更并填充 Kafka 主题的服务。

但并不是所有的应用程序都需要 Kafka Connect 提供的容错能力或可伸缩性。此外,有时候应用程序必须自己捕获数据变更事件,并执行一些自定义逻辑,而不只将变更发送到消息传递系统中。

对这些情况,debezium-api 模块定义了一个 API 来将 Debezium 引擎嵌入到应用程序中。

到目前为止,我们知道了应该要尽量避免双重写入。我们的解决方案是使用 CDC 直接从事务日志中获取数据,并将其推送到 Kafka 主题,这样其他系统就可以以“事务性”的方式和顺序消费这些数据。

发件箱模式

看到这里,你可能会想:“好吧,我可以通过 CDC 对数据变更作出反应,但内部实体被暴露给了外部系统。”虽然这是真的,但请允许我向你介绍发件箱模式来避免这个问题。

发件箱模式提供了一个发件箱表,你可以在其中记录所有实体的操作(可能使用非规范化数据)。然后 CDC 系统(在我们的例子中使用的是 Debezium)对发件箱表(而不是实体表)中的变更做出反应,这样就实现了数据模型与其他系统的隔离。

需要注意的是,实体变更和发件箱必须在同一个事务中。

让我们把所有这些碎片放在一个 Quarkus 项目中,并解决我们在一开始提出的问题——如何在数据库中插入与电影相关的信息,并将其填充到外部系统(Kafka 主题)中。

我们的 Debezium 示例

我们不再为每一种场景手动编写代码,而是使用 Debezium 引擎并将它与 Quarkus 集成来解决这个问题。

创建项目

到 Quarkus起始页,选择 RestEasy Reactive 和 RestEasy Reactive Jackson 插件(用于编码/解码数据),实现 JAX-RS 端点,使用 Panache 和 MySQL 驱动程序将电影信息插入到 MySQL 数据库,使用 SmallRye Reactive Messaging 与 Kafka 发生交互。另外,取消选中 Started Code 选项,如下图所示。

你可以跳过这个手动步骤,并打开Quarkus Generator链接,这里所有的依赖项都被选中。然后按下“Generate your application”按钮,下载搭建好的应用程序 zip 文件包。

解压缩文件并在你最喜欢的 IDE 中打开项目。

开发

在开始编码之前,我们需要添加两个新的依赖项:一个用于使用 Debezium 引擎,另一个用于添加 Debezium Quarkus Outbox 插件。

Debezium 引擎

打开 pom.xml 文件并添加以下依赖项。

在 dependencyManagement 部分:

代码语言:javascript
复制
<dependency>
   <groupId>io.debezium</groupId>
   <artifactId>debezium-bom</artifactId>
   <version>1.9.4.Final</version>
   <type>pom</type>
   <scope>import</scope>
</dependency>

在 dependencies 部分:

代码语言:javascript
复制
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-ddl-parser</artifactId>
</dependency>
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-embedded</artifactId>
</dependency>
<!-- We connect to a MySQL database, so we need debezium MySQL connector -->
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-connector-mysql</artifactId>
</dependency>

这是为了使用嵌入在应用程序中的 Debezium 引擎。如果我们使用 Debezium 服务器,就不需要这些依赖项,因为它是一个独立的服务。

Debezium Quarkus Outbox 插件

Quarkus 通过Debezium Quarkus Outbox插件实现发件箱模式。

打开 pom.xml 文件并添加以下依赖项。

在 dependencyManagement 部分:

代码语言:javascript
复制
<dependency>
   <groupId>${quarkus.platform.group-id}</groupId>
   <artifactId>quarkus-debezium-bom</artifactId>
   <version>${quarkus.platform.version}</version>
   <type>pom</type>
   <scope>import</scope>
 </dependency>

请注意 BOM 的版本与 Quarkus 版本要对齐,在这里是 2.10.1.Final。

在 dependencies 部分:

代码语言:javascript
复制
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox</artifactId>
</dependency>

实现

你可以选择不使用发件箱模式或自己实现它,如果是这样,那么这些依赖项就都不需要。但为简单起见,我们在这里使用它。

有了这些依赖项,我们就可以创建带有 JPA 注解并扩展了 PanacheEntity 类的的 Movie 实体。

代码语言:javascript
复制
import javax.persistence.Entity;
 
import io.quarkus.hibernate.orm.panache.PanacheEntity;
 
@Entity
public class Movie extends PanacheEntity {
  
   // No worries Quarkus will change them
   // to private and auto-generate getters/setters at compilation time
   public String name;
   public String director;
   public String genre;
 
}

下一步是创建一个 HTTP 端点,使用 JAX-RS 注解将电影信息插入到数据库中。

代码语言:javascript
复制
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
 
import org.jboss.logging.Logger;
 
@Path("/movie")
public class MovieResource {
 
   // Service to insert the movie data into Movie and Outbox tables
   @Inject
   MovieService movieService;
 
   // Injects the logger
   @Inject
   Logger logger;
 
   // Http Post method to insert a movie
   @POST
   public Movie insert(Movie movie) {
       logger.info("New Movie inserted " + movie.name);
       System.out.println(":)");
      
       return movieService.insertMovie(movie);
   }
}

因为我们使用的是 Debezium Quarkus Outbox 插件,所以需要创建一个用于表示发件箱表中存储的内容的实体。实体必须实现 ExportedEvent 接口,并实现接口方法来识别发件箱表中放置的事件类型。

代码语言:javascript
复制
import java.time.Instant;
 
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
 
import io.debezium.outbox.quarkus.ExportedEvent;
 
public class MovieEvent implements ExportedEvent<String, JsonNode> {
 
   private static ObjectMapper mapper = new ObjectMapper();
 
   // Set the type enclosed inside the event
   private static final String TYPE = "Movie";
   // Set the event type
   private static final String EVENT_TYPE = "MovieCreated";
 
   private final long gameId;
   private final JsonNode jsonNode;
   private final Instant timestamp;
 
   // Saves Game info in the class
   public MovieEvent(Movie movie) {
       this.gameId = movie.id;
       this.timestamp = Instant.now();
       // Saves game content in a string column in JSON format
       this.jsonNode = convertToJson(movie);
   }
 
   @Override
   public String getAggregateId() {
       return String.valueOf(this.gameId);
   }
 
   @Override
   public String getAggregateType() {
       return TYPE;
   }
 
   @Override
   public JsonNode getPayload() {
       return jsonNode;
   }
 
   @Override
   public Instant getTimestamp() {
       return timestamp;
   }
 
   @Override
   public String getType() {
       return EVENT_TYPE;
   }
  
   private JsonNode convertToJson(Movie movie) {
       ObjectNode asJson = mapper.createObjectNode()
               .put("id", movie.id)
               .put("name", movie.name)
               .put("director", movie.director)
               .put("genre", movie.genre);
      
       return asJson;
   }
 
}

在将 Debezium 逻辑添加到代码之前,我们还需要实现 MovieService 类,加入插入数据的逻辑。这个逻辑应该将电影信息持久化到 Movie 表中,并将 MovieEvent 实体持久化到由 OutboxEvent 插件管理的表中。

这个插件提供了一个特定的 CDI 事件来持久化实现了 ExportedEvent 接口的事件。我们唯一要做的事情是触发一个事件,数据将自动被持久化。

代码语言:javascript
复制
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.transaction.Transactional;
 
import io.debezium.outbox.quarkus.ExportedEvent;
 
@ApplicationScoped
public class MovieService {
  
   // CDI event interface triggering Outbox entities
   @Inject
   Event<ExportedEvent<?, ?>> event;
 
   // Transaction method
   @Transactional
   public Movie insertMovie(Movie movie) {
 
       // Persists data
       movie.persist();
      
       // Persists outbox content
       event.fire(new MovieEvent(movie));
      
       return movie;
   }
}

最后一步是配置 Debezium 引擎,并将其嵌入到应用程序中。

要配置引擎,你需要设置数据库信息(主机名、端口、凭证)以及 Debezium 要监控的数据库和表。

代码语言:javascript
复制
import java.io.File;
import java.io.IOException;
 
import javax.enterprise.inject.Produces;
 
import org.eclipse.microprofile.config.inject.ConfigProperty;
 
import io.debezium.config.Configuration;
 
public class DebeziumConfiguration {
  
   // Debezium needs Database URL and credentials to login and
   // monitor transaction logs
   @ConfigProperty(name = "quarkus.datasource.jdbc.url")
   String url;
 
   @ConfigProperty(name = "quarkus.datasource.password")
   String password;
 
   @ConfigProperty(name = "quarkus.datasource.username")
   String username;

   @Produces
   public Configuration configureDebezium() throws IOException {
 
       // Custom class to get database name or hostname of Database server
       MySqlJdbcParser jdbcParser = MySqlJdbcParser.parse(url);
      
       File fileOffset = File.createTempFile("offset", ".dat");
       File fileDbHistory = File.createTempFile("dbhistory", ".dat");
 
       return io.debezium.config.Configuration.create()
           .with("name", "movies-mysql-connector")
           // configures MySQL connector
           .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
           .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
           .with("offset.storage.file.filename", fileOffset.getAbsolutePath())
           .with("offset.flush.interval.ms", "60000")
           // Configures database location
           .with("database.hostname", jdbcParser.getHost())
           .with("database.port", jdbcParser.getPort())
           .with("database.user", "root")
           .with("database.allowPublicKeyRetrieval", "true")
           .with("database.password", password)
           .with("database.dbname", jdbcParser.getDatabase())
           .with("database.include.list", jdbcParser.getDatabase())
           // Debezium only sends events for the modifications of OutboxEvent table and not all tables
           .with("table.include.list", jdbcParser.getDatabase() + ".OutboxEvent")
           .with("include.schema.changes", "false")
           .with("database.server.id", "10181")
           .with("database.server.name", "movies-mysql-db-server")
           .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
           .with("database.history.file.filename", fileDbHistory.getAbsolutePath())
       .build();
   }
 
}

DebeziumListener CDI 类会在应用程序启动时启动 Debezium。

Debezium 引擎并不是在单独的线程中运行,所以我们需要提供一个并行运行的线程,而不是阻塞应用程序的线程。在 Quarkus 中,我们可以使用 ManagedExecutor 提供执行程序线程来运行 Debezium。

然后,我们需要使用 DebeziumEngine 类来实例化 Debezium 引擎,并设置在上一步中创建的配置属性。最重要的一个步骤是注册一个在 Debezium 每次生成事件时触发的方法。notifying 方法会对这个自定义方法进行注册,在我们的示例中,这个方法叫作 handleChangeEvent。

这个方法用于接收事件,我们可以实现任何我们想要的逻辑——将事件发送到 Kafka 主题或者其他服务——任何你可以在 Java 中实现的东西。

代码语言:javascript
复制
import java.io.IOException;
 
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
 
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import io.debezium.config.Configuration;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.reactive.messaging.kafka.Record;
 
import static io.debezium.data.Envelope.FieldName.*;
import static io.debezium.data.Envelope.Operation;

@ApplicationScoped
public class DebeziumListener {
  
   private static ObjectMapper objectMapper = new ObjectMapper();
 
   // Start the Debezium engine in a different thread
   ManagedExecutor executor;
 
   // Debezium configuration object
   Configuration configuration;
 
   private DebeziumEngine<RecordChangeEvent<SourceRecord>> engine;
 
   public DebeziumListener(ManagedExecutor executor, Configuration configuration) {
       this.executor = executor;
       this.configuration = configuration;
   }
 
   // Interface to send events to movies Kafka topic
   @Channel("movies")
   Emitter<Record<Long, JsonNode>> movieEmitter;
 
   void onStart(@Observes StartupEvent event) {
 
       // Configures Debezium engine
       this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
           .using(this.configuration.asProperties())
           // For each event triggered by Debezium, the handleChangeEvnt method is called
           .notifying(this::handleChangeEvent)
           .build();
 
       // Starts Debezium in different thread
       this.executor.execute(this.engine);
   }

   void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
 
       // For each triggered event, we get the information
       SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
       Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
 
       if (sourceRecordChangeValue != null) {
           Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
 
           // Only insert operations are processed
           if(operation == Operation.CREATE) {
 
               // Get insertation info
               Struct struct = (Struct) sourceRecordChangeValue.get(AFTER);
               String type = struct.getString("type");
               String payload = struct.getString("payload");
 
               if ("GameCreated".equals(type)) {
                   try {
                       final JsonNode payloadJson = objectMapper.readValue(payload, JsonNode.class);
                       long id = payloadJson.get("id").asLong();
 
                       // Populate content to Kafka topic
                       movieEmitter.send(Record.of(id, payloadJson));
                   } catch (JsonProcessingException e) {
                       throw new IllegalArgumentException(e);
                   }
               }
           }
       }
   }

   void onStop(@Observes ShutdownEvent event) throws IOException {
       if (this.engine != null) {
           this.engine.close();
       }
   }
 
}

运行

这个示例是自包含的,因此你不需要启动任何东西,因为 Quarkus 会为你启动它。

Panache 和 Kafka Connector 已经与 Quarkus DevServices集成,因此我们不需要启动 Kafka 集群或 MySQL 数据库,也不需要将它们配置为 Quarkus Dev 模式。电脑上需要有一个可运行的容器运行时,比如 Podman 或任何其他兼容 OCI 的工具。

为了便于跟踪,在运行应用程序之前,我们向应用程序中添加两个配置属性。在 application.properties 文件中添加下面的两行。

代码语言:javascript
复制
quarkus.hibernate-orm.log.sql=true
quarkus.debezium-outbox.remove-after-insert=false

第一行记录执行的 SQL 语句。这有助于在插入数据时对两张表(Movies 和 OutboxEvent)进行验证。

第二行避免 Debezium 在使用发件箱表后删除数据。

在终端窗口中启动服务:

代码语言:javascript
复制
./mvnw clean quarkus:dev

…
2022-07-07 11:36:22,942 INFO  [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Waiting for keepalive thread to start
2022-07-07 11:36:22,948 INFO  [io.deb.con.mys.MySqlStreamingChangeEventSource] (debezium-mysqlconnector-movies-mysql-db-server-change-event-source-coordinator) Keepalive thread is running
2022-07-07 11:37:43,889 INFO  [org.acm.MovieResource] (executor-thread-1) New Movie inserted string

几秒钟后,Kafka 集群、MySQL 实例和应用程序就启动起来了。

通过检查运行的容器来验证实例:

代码语言:javascript
复制
docker ps

CONTAINER ID   IMAGE                          COMMAND                  CREATED          STATUS          PORTS                                    
         
fa316bfae219   vectorized/redpanda:v21.11.3   "sh -c 'while [ ! -f…"   49 seconds ago   Up 45 seconds   8081-8082/tcp, 9644/tcp, 0.0.0.0:55002->9092/tcp

4c220f7ee066   mysql:8.0                      "docker-entrypoint.s…"   50 seconds ago   Up 46 seconds   33060/tcp, 0.0.0.0:60652->3306/tcp

e41cae02ff02   testcontainers/ryuk:0.3.3      "/app"                   53 seconds ago   Up 50 seconds   0.0.0.0:60650->8080/tcp

Kafka 集群运行在端口 55002 上,和 MySQL(ID 为 4c220f7ee066)运行在端口 60652 上。

注意:不同情况下端口和 ID 可能不同。

在另一个终端窗口中运行 curl 命令,插入一个新的 Movie 记录。

代码语言:javascript
复制
curl -X 'POST' \
  'http://localhost:8080/movie' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "Minions: The Rise of Gru",
  "director": "Kyle Balda",
  "genre": "Animation"
}'

检查 Quarkus 终端窗口,可以看到数据库运行的 SQL 语句。

代码语言:javascript
复制
:)
Hibernate:
    select
        next_val as id_val
    from
        hibernate_sequence for update



Hibernate:
    update
        hibernate_sequence
    set
        next_val= ?
    where
        next_val=?

// Insert into Movie 

Hibernate:
    insert
    into
        Movie
        (director, genre, name, id)
    values
        (?, ?, ?, ?)

// Automatically OutboxEvent table receives an insert

Hibernate:
    insert
    into
        OutboxEvent
        (aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id)
    values
        (?, ?, ?, ?, ?, ?, ?)

为了验证 Debezium 可以检测到变更并将其推送到 Movies Kafka 主题,我们将运行kcat工具来查询 Kafka 主题。

代码语言:javascript
复制
kcat -b localhost:55002 -C -t movies

{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
% Reached end of topic movies [0] at offset 1

结论

我们实现了一种解决方案,通过使用 Debezium 读取事务日志并为每一个变更触发一个事件,解决了数据库和外部系统之间的双重写入问题。

在本例中,我们使用了 Debezium 引擎,并实现了在触发事件时执行的逻辑。

嵌入式方式可能在某些场景中有用,但在其他场景中(特别是在需要高可伸缩性和容错能力的项目中),Debezium 服务器可能更适合。如果使用 Debezium 服务器(作为一个 Kafka Connect 进程),你的代码就不需要做出修改(没有依赖项),因为 Debezium 是一个独立的进程,它会自己连接到数据库事务日志,检测变更,并将它们发送到 Kafka 主题。由于事件是有序的,所以任何系统都可以消费主题中的变更事件。

尽管在使用 Debezium 时,发件箱模式并不是必需的(到最后,Debezium 可以监听任何一张表中的变更),但隔离数据是一个很好的实践,发件箱模式可以帮助你做到这一点。

集成(微)服务架构最初看起来可能很容易,但当你开始集成数据时,事情就变复杂了,而 Debezium 项目可以帮助你完成这项任务。

源代码可以在GitHub上找到。

作者简介:

Alex Soto 是 Red Hat 的开发者体验总监。他对 Java 和软件自动化充满热情,并相信开源软件模型。Soto 是《Testing Java Microservices》(Manning)和《Quarkus Cookbook》(O'Reilly)的合著者,也是几个开源项目的贡献者。自 2017 年以来,他获得 Java Champion 的称号,也是 Salle URL University 的国际演讲者和教师。你可以在 Twitter 上关注他(Alex Soto),了解 Kubernetes 和 Java 世界正在发生的事情。

原文链接

https://www.infoq.com/articles/change-data-capture-debezium/

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/V4kqPgrQlqqPgWQuMmDP
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券