前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊debezium的SnapshotChangeRecordEmitter

聊聊debezium的SnapshotChangeRecordEmitter

作者头像
code4it
发布2020-05-16 14:29:34
3850
发布2020-05-16 14:29:34
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下debezium的SnapshotChangeRecordEmitter

SnapshotChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/SnapshotChangeRecordEmitter.java

代码语言:javascript
复制
public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter {

    private final Object[] row;

    public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) {
        super(offset, clock);

        this.row = row;
    }

    @Override
    protected Operation getOperation() {
        return Operation.READ;
    }

    @Override
    protected Object[] getOldColumnValues() {
        throw new UnsupportedOperationException("Can't get old row values for READ record");
    }

    @Override
    protected Object[] getNewColumnValues() {
        return row;
    }
}
  • SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row

RelationalChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java

代码语言:javascript
复制
public abstract class RelationalChangeRecordEmitter extends AbstractChangeRecordEmitter<TableSchema> {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
        super(offsetContext, clock);
    }

    @Override
    public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
        TableSchema tableSchema = (TableSchema) schema;
        Operation operation = getOperation();

        switch (operation) {
            case CREATE:
                emitCreateRecord(receiver, tableSchema);
                break;
            case READ:
                emitReadRecord(receiver, tableSchema);
                break;
            case UPDATE:
                emitUpdateRecord(receiver, tableSchema);
                break;
            case DELETE:
                emitDeleteRecord(receiver, tableSchema);
                break;
            default:
                throw new IllegalArgumentException("Unsupported operation: " + operation);
        }
    }

    @Override
    protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)
            throws InterruptedException {
        Object[] newColumnValues = getNewColumnValues();
        Object newKey = tableSchema.keyFromColumnData(newColumnValues);
        Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
        Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

        if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
            logger.warn("no new values found for table '{}' from create message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
            return;
        }
        receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());
    }

    @Override
    protected void emitReadRecord(Receiver receiver, TableSchema tableSchema)
            throws InterruptedException {
        Object[] newColumnValues = getNewColumnValues();
        Object newKey = tableSchema.keyFromColumnData(newColumnValues);
        Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
        Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

        receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset());
    }

    @Override
    protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
            throws InterruptedException {
        Object[] oldColumnValues = getOldColumnValues();
        Object[] newColumnValues = getNewColumnValues();

        Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);
        Object newKey = tableSchema.keyFromColumnData(newColumnValues);

        Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
        Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

        if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
            logger.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
            return;
        }
        // some configurations does not provide old values in case of updates
        // in this case we handle all updates as regular ones
        if (oldKey == null || Objects.equals(oldKey, newKey)) {
            Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
            receiver.changeRecord(tableSchema, Operation.UPDATE, newKey, envelope, getOffset());
        }
        // PK update -> emit as delete and re-insert with new key
        else {
            Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
            receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());

            envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
            receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());
        }
    }

    @Override
    protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException {
        Object[] oldColumnValues = getOldColumnValues();
        Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);
        Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

        if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {
            logger.warn("no old values found for table '{}' from delete message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
            return;
        }

        Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
        receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());
    }

    /**
     * Returns the operation done by the represented change.
     */
    protected abstract Operation getOperation();

    /**
     * Returns the old row state in case of an UPDATE or DELETE.
     */
    protected abstract Object[] getOldColumnValues();

    /**
     * Returns the new row state in case of a CREATE or READ.
     */
    protected abstract Object[] getNewColumnValues();

    /**
     * Whether empty data messages should be ignored.
     *
     * @return true if empty data messages coming from data source should be ignored.</br>
     * Typical use case are PostgreSQL changes without FULL replica identity.
     */
    protected boolean skipEmptyMessages() {
        return false;
    }
}
  • RelationalChangeRecordEmitter继承了AbstractChangeRecordEmitter,其泛型为TableSchema;其emitChangeRecords方法根据不同的operation执行不同的emit方法;这些emit方法主要是构造key及envelope,然后执行receiver.changeRecord

AbstractChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/AbstractChangeRecordEmitter.java

代码语言:javascript
复制
public abstract class AbstractChangeRecordEmitter<T extends DataCollectionSchema> implements ChangeRecordEmitter {

    private final OffsetContext offsetContext;
    private final Clock clock;

    public AbstractChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
        this.offsetContext = offsetContext;
        this.clock = clock;
    }

    @Override
    @SuppressWarnings({ "unchecked" })
    public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
        Operation operation = getOperation();
        switch (operation) {
            case CREATE:
                emitCreateRecord(receiver, (T) schema);
                break;
            case READ:
                emitReadRecord(receiver, (T) schema);
                break;
            case UPDATE:
                emitUpdateRecord(receiver, (T) schema);
                break;
            case DELETE:
                emitDeleteRecord(receiver, (T) schema);
                break;
            default:
                throw new IllegalArgumentException("Unsupported operation: " + operation);
        }
    }

    @Override
    public OffsetContext getOffset() {
        return offsetContext;
    }

    /**
     * Returns the clock of the change record(s) emitted.
     */
    public Clock getClock() {
        return clock;
    }

    /**
     * Returns the operation associated with the change.
     */
    protected abstract Operation getOperation();

    /**
     * Emits change record(s) associated with a snapshot.
     *
     * @param receiver the handler for which the emitted record should be dispatched
     * @param schema the schema
     */
    protected abstract void emitReadRecord(Receiver receiver, T schema) throws InterruptedException;

    /**
     * Emits change record(s) associated with an insert operation.
     *
     * @param receiver the handler for which the emitted record should be dispatched
     * @param schema the schema
     */
    protected abstract void emitCreateRecord(Receiver receiver, T schema) throws InterruptedException;

    /**
     * Emits change record(s) associated with an update operation.
     *
     * @param receiver the handler for which the emitted record should be dispatched
     * @param schema the schema
     */
    protected abstract void emitUpdateRecord(Receiver receiver, T schema) throws InterruptedException;

    /**
     * Emits change record(s) associated with a delete operation.
     *
     * @param receiver the handler for which the emitted record should be dispatched
     * @param schema the schema
     */
    protected abstract void emitDeleteRecord(Receiver receiver, T schema) throws InterruptedException;
}
  • AbstractChangeRecordEmitter实现了ChangeRecordEmitter接口,其提供了emitChangeRecords方法,封装了针对不同operation的调用,同事定义了emitCreateRecord、emitReadRecord、emitUpdateRecord、emitDeleteRecord方法供子类实现

ChangeRecordEmitter

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java

代码语言:javascript
复制
public interface ChangeRecordEmitter {

    /**
     * Emits the change record(s) corresponding to data change represented by this emitter.
     */
    void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException;

    /**
     * Returns the offset of the change record(s) emitted.
     */
    OffsetContext getOffset();

    public interface Receiver {
        void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException;
    }
}
  • ChangeRecordEmitter接口定义了emitChangeRecords、getOffset方法,同时还定义了Receiver接口,该接口定义了changeRecord方法

小结

SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row

doc

  • SnapshotChangeRecordEmitter
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SnapshotChangeRecordEmitter
  • RelationalChangeRecordEmitter
  • AbstractChangeRecordEmitter
  • ChangeRecordEmitter
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档