Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊debezium的SnapshotChangeRecordEmitter

聊聊debezium的SnapshotChangeRecordEmitter

原创
作者头像
code4it
修改于 2020-05-14 02:20:22
修改于 2020-05-14 02:20:22
60300
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下debezium的SnapshotChangeRecordEmitter

SnapshotChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/SnapshotChangeRecordEmitter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter {private final Object[] row;public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) {
        super(offset, clock);this.row = row;
    }
​
    @Override
    protected Operation getOperation() {
        return Operation.READ;
    }
​
    @Override
    protected Object[] getOldColumnValues() {
        throw new UnsupportedOperationException("Can't get old row values for READ record");
    }
​
    @Override
    protected Object[] getNewColumnValues() {
        return row;
    }
}
  • SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row

RelationalChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract class RelationalChangeRecordEmitter extends AbstractChangeRecordEmitter<TableSchema> {protected final Logger logger = LoggerFactory.getLogger(getClass());public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
        super(offsetContext, clock);
    }
​
    @Override
    public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
        TableSchema tableSchema = (TableSchema) schema;
        Operation operation = getOperation();switch (operation) {
            case CREATE:
                emitCreateRecord(receiver, tableSchema);
                break;
            case READ:
                emitReadRecord(receiver, tableSchema);
                break;
            case UPDATE:
                emitUpdateRecord(receiver, tableSchema);
                break;
            case DELETE:
                emitDeleteRecord(receiver, tableSchema);
                break;
            default:
                throw new IllegalArgumentException("Unsupported operation: " + operation);
        }
    }
​
    @Override
    protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)
            throws InterruptedException {
        Object[] newColumnValues = getNewColumnValues();
        Object newKey = tableSchema.keyFromColumnData(newColumnValues);
        Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
        Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
            logger.warn("no new values found for table '{}' from create message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
            return;
        }
        receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());
    }
​
    @Override
    protected void emitReadRecord(Receiver receiver, TableSchema tableSchema)
            throws InterruptedException {
        Object[] newColumnValues = getNewColumnValues();
        Object newKey = tableSchema.keyFromColumnData(newColumnValues);
        Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
        Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
​
        receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset());
    }
​
    @Override
    protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
            throws InterruptedException {
        Object[] oldColumnValues = getOldColumnValues();
        Object[] newColumnValues = getNewColumnValues();
​
        Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);
        Object newKey = tableSchema.keyFromColumnData(newColumnValues);
​
        Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
        Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
            logger.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
            return;
        }
        // some configurations does not provide old values in case of updates
        // in this case we handle all updates as regular ones
        if (oldKey == null || Objects.equals(oldKey, newKey)) {
            Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
            receiver.changeRecord(tableSchema, Operation.UPDATE, newKey, envelope, getOffset());
        }
        // PK update -> emit as delete and re-insert with new key
        else {
            Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
            receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());
​
            envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
            receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());
        }
    }
​
    @Override
    protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException {
        Object[] oldColumnValues = getOldColumnValues();
        Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);
        Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {
            logger.warn("no old values found for table '{}' from delete message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
            return;
        }
​
        Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
        receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());
    }/**
     * Returns the operation done by the represented change.
     */
    protected abstract Operation getOperation();/**
     * Returns the old row state in case of an UPDATE or DELETE.
     */
    protected abstract Object[] getOldColumnValues();/**
     * Returns the new row state in case of a CREATE or READ.
     */
    protected abstract Object[] getNewColumnValues();/**
     * Whether empty data messages should be ignored.
     *
     * @return true if empty data messages coming from data source should be ignored.</br>
     * Typical use case are PostgreSQL changes without FULL replica identity.
     */
    protected boolean skipEmptyMessages() {
        return false;
    }
}
  • RelationalChangeRecordEmitter继承了AbstractChangeRecordEmitter,其泛型为TableSchema;其emitChangeRecords方法根据不同的operation执行不同的emit方法;这些emit方法主要是构造key及envelope,然后执行receiver.changeRecord

AbstractChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/AbstractChangeRecordEmitter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public abstract class AbstractChangeRecordEmitter<T extends DataCollectionSchema> implements ChangeRecordEmitter {private final OffsetContext offsetContext;
    private final Clock clock;public AbstractChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
        this.offsetContext = offsetContext;
        this.clock = clock;
    }
​
    @Override
    @SuppressWarnings({ "unchecked" })
    public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
        Operation operation = getOperation();
        switch (operation) {
            case CREATE:
                emitCreateRecord(receiver, (T) schema);
                break;
            case READ:
                emitReadRecord(receiver, (T) schema);
                break;
            case UPDATE:
                emitUpdateRecord(receiver, (T) schema);
                break;
            case DELETE:
                emitDeleteRecord(receiver, (T) schema);
                break;
            default:
                throw new IllegalArgumentException("Unsupported operation: " + operation);
        }
    }
​
    @Override
    public OffsetContext getOffset() {
        return offsetContext;
    }/**
     * Returns the clock of the change record(s) emitted.
     */
    public Clock getClock() {
        return clock;
    }/**
     * Returns the operation associated with the change.
     */
    protected abstract Operation getOperation();/**
     * Emits change record(s) associated with a snapshot.
     *
     * @param receiver the handler for which the emitted record should be dispatched
     * @param schema the schema
     */
    protected abstract void emitReadRecord(Receiver receiver, T schema) throws InterruptedException;/**
     * Emits change record(s) associated with an insert operation.
     *
     * @param receiver the handler for which the emitted record should be dispatched
     * @param schema the schema
     */
    protected abstract void emitCreateRecord(Receiver receiver, T schema) throws InterruptedException;/**
     * Emits change record(s) associated with an update operation.
     *
     * @param receiver the handler for which the emitted record should be dispatched
     * @param schema the schema
     */
    protected abstract void emitUpdateRecord(Receiver receiver, T schema) throws InterruptedException;/**
     * Emits change record(s) associated with a delete operation.
     *
     * @param receiver the handler for which the emitted record should be dispatched
     * @param schema the schema
     */
    protected abstract void emitDeleteRecord(Receiver receiver, T schema) throws InterruptedException;
}
  • AbstractChangeRecordEmitter实现了ChangeRecordEmitter接口,其提供了emitChangeRecords方法,封装了针对不同operation的调用,同事定义了emitCreateRecord、emitReadRecord、emitUpdateRecord、emitDeleteRecord方法供子类实现

ChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface ChangeRecordEmitter {/**
     * Emits the change record(s) corresponding to data change represented by this emitter.
     */
    void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException;/**
     * Returns the offset of the change record(s) emitted.
     */
    OffsetContext getOffset();public interface Receiver {
        void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException;
    }
}
  • ChangeRecordEmitter接口定义了emitChangeRecords、getOffset方法,同时还定义了Receiver接口,该接口定义了changeRecord方法

小结

SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊debezium的SnapshotChangeRecordEmitter
本文主要研究一下debezium的SnapshotChangeRecordEmitter
code4it
2020/05/16
4050
聊聊debezium的RecordMakers
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
code4it
2020/05/20
5590
聊聊debezium的RecordMakers
聊聊debezium的eventHandlers
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
code4it
2020/05/19
5550
聊聊debezium的eventHandlers
聊聊debezium的OffsetCommitPolicy
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java
code4it
2020/05/21
1.2K0
聊聊debezium的OffsetCommitPolicy
【万字长文】Flink cdc源码精讲(推荐收藏)
上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现
857技术社区
2022/05/17
5.7K0
【万字长文】Flink cdc源码精讲(推荐收藏)
聊聊debezium的BinlogReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java
code4it
2020/05/18
8200
聊聊debezium的BinlogReader
聊聊debezium的ChangeEventQueue
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java
code4it
2020/05/12
8440
聊聊debezium的ChangeEventQueue
聊聊debezium的SimpleSourceConnector
debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java
code4it
2020/05/17
5240
聊聊debezium的SimpleSourceConnector
聊聊debezium的Heartbeat
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java
code4it
2020/05/14
1.2K0
聊聊debezium的Heartbeat
Flink CDC 2.0 数据处理流程全面解析
8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取、支持checkpoint,且在全量 + 增量读取的过程在不锁表的情况下保障数据一致性。
王知无-import_bigdata
2021/11/10
3.2K1
Flink CDC 2.0 数据处理流程全面解析
Flink CDC 和 kafka 进行多源合并和下游同步更新
摘要:本文介绍了 Flink CDC 利用 Kafka 进行 CDC 多源合并和下游同步更新的实践分享。内容包括:
文末丶
2022/02/10
3.3K0
Flink CDC 和 kafka 进行多源合并和下游同步更新
聊聊debezium的BlockingReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java
code4it
2020/05/15
4270
聊聊debezium的BlockingReader
Flink 实现 MySQL CDC 动态同步表结构
作者:陈少龙,腾讯 CSIG 高级工程师 使用 Flink CDC(Change Data Capture) 实现数据同步被越来越多的人接受。本文介绍了在数据同步过程中,如何将 Schema 的变化实时地从 MySQL 中同步到 Flink 程序中去。 背景 MySQL 存储的数据量大了之后往往会出现查询性能下降的问题,这时候通过 Flink SQL 里的 MySQL CDC Connector 将数据同步到其他数据存储是常见的一种处理方式。 例如 CDC 到 ES 实现数据检索,CDC 到 ClikHou
腾讯云大数据
2022/01/26
7.9K0
聊聊DebeziumEngine
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java
code4it
2020/05/16
7300
聊聊DebeziumEngine
聊聊flink的EventTime
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
code4it
2018/12/19
1.8K0
聊聊flink的EventTime
如何利用 Flink CDC 实现数据增量备份到 Clickhouse
首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。
麒思妙想
2021/07/19
4.7K0
聊聊CRDT
CRDT是Conflict-free Replicated Data Type的简称,也称为a passive synchronisation,即免冲突的可复制的数据类型,这种数据类型可以用于数据跨网络复制并且可以自动解决冲突达到一致,非常适合使用AP架构的系统在各个partition之间复制数据时使用;具体实现上可以分为State-based的CvRDT、Operation-based的CmRDT、Delta-based、Pure operation-based等
code4it
2019/05/08
4.6K0
聊聊CRDT
聊聊flink Table的ScalarFunction
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala
code4it
2019/02/10
2.6K0
聊聊flink Table的ScalarFunction
聊聊flink的EventTime
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java
code4it
2018/12/28
9330
Debezium的基本使用(以MySQL为例)
简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。
GreatSQL社区
2023/02/23
3.8K0
相关推荐
聊聊debezium的SnapshotChangeRecordEmitter
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档