前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊debezium的RecordMakers

聊聊debezium的RecordMakers

原创
作者头像
code4it
修改2020-05-21 09:53:07
5310
修改2020-05-21 09:53:07
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下debezium的RecordMakers

RecordMakers

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

代码语言:javascript
复制
public class RecordMakers {
​
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final MySqlSchema schema;
    private final SourceInfo source;
    private final TopicSelector<TableId> topicSelector;
    private final boolean emitTombstoneOnDelete;
    private final Map<Long, Converter> convertersByTableNumber = new HashMap<>();
    private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>();
    private final Map<Long, TableId> tableIdsByTableNumber = new HashMap<>();
    private final Schema schemaChangeKeySchema;
    private final Schema schemaChangeValueSchema;
    private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(logger);
    private final Map<String, ?> restartOffset;
​
    //......
​
    public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector<TableId> topicSelector,
                        boolean emitTombstoneOnDelete, Map<String, ?> restartOffset) {
        this.schema = schema;
        this.source = source;
        this.topicSelector = topicSelector;
        this.emitTombstoneOnDelete = emitTombstoneOnDelete;
        this.restartOffset = restartOffset;
        this.schemaChangeKeySchema = SchemaBuilder.struct()
                .name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.SchemaChangeKey"))
                .field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)
                .build();
        this.schemaChangeValueSchema = SchemaBuilder.struct()
                .name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.SchemaChangeValue"))
                .field(Fields.SOURCE, source.schema())
                .field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA)
                .field(Fields.DDL_STATEMENTS, Schema.STRING_SCHEMA)
                .build();
    }
​
    public RecordsForTable forTable(TableId tableId, BitSet includedColumns, BlockingConsumer<SourceRecord> consumer) {
        Long tableNumber = tableNumbersByTableId.get(tableId);
        return tableNumber != null ? forTable(tableNumber, includedColumns, consumer) : null;
    }
​
    //......
​
}
  • RecordMakers提供了forTable方法,用于创建RecordsForTable

RecordsForTable

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

代码语言:javascript
复制
    public final class RecordsForTable {
        private final BitSet includedColumns;
        private final Converter converter;
        private final BlockingConsumer<SourceRecord> consumer;
​
        protected RecordsForTable(Converter converter, BitSet includedColumns, BlockingConsumer<SourceRecord> consumer) {
            this.converter = converter;
            this.includedColumns = includedColumns;
            this.consumer = consumer;
        }
​
        public int read(Object[] row, Instant ts) throws InterruptedException {
            return read(row, ts, 0, 1);
        }
​
        public int read(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException {
            return converter.read(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer);
        }
​
        public int create(Object[] row, Instant ts) throws InterruptedException {
            return create(row, ts, 0, 1);
        }
​
        public int create(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException {
            return converter.insert(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer);
        }
​
        public int update(Object[] before, Object[] after, Instant ts) throws InterruptedException {
            return update(before, after, ts, 0, 1);
        }
​
        public int update(Object[] before, Object[] after, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException {
            return converter.update(source, before, after, rowNumber, numberOfRows, includedColumns, ts, consumer);
        }
​
        public int delete(Object[] row, Instant ts) throws InterruptedException {
            return delete(row, ts, 0, 1);
        }
​
        public int delete(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException {
            return converter.delete(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer);
        }
    }
  • RecordsForTable提供了read、create、update、delete方法,它们都委托给converter的对应方法

Converter

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

代码语言:javascript
复制
    protected static interface Converter {
        int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,
                 BlockingConsumer<SourceRecord> consumer)
                throws InterruptedException;
​
        int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,
                   BlockingConsumer<SourceRecord> consumer)
                throws InterruptedException;
​
        int update(SourceInfo source, Object[] before, Object[] after, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,
                   BlockingConsumer<SourceRecord> consumer)
                throws InterruptedException;
​
        int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,
                   BlockingConsumer<SourceRecord> consumer)
                throws InterruptedException;
​
    }
  • Converter接口定义了read、insert、update、delete方法

RecordMakers.Converter

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

代码语言:javascript
复制
        Converter converter = new Converter() {
​
            @Override
            public int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,
                            BlockingConsumer<SourceRecord> consumer)
                    throws InterruptedException {
                Object key = tableSchema.keyFromColumnData(row);
                Struct value = tableSchema.valueFromColumnData(row);
                if (value != null || key != null) {
                    Schema keySchema = tableSchema.keySchema();
                    Map<String, ?> partition = source.partition();
                    Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);
                    source.tableEvent(id);
                    Struct origin = source.struct();
                    SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
                            keySchema, key, envelope.schema(), envelope.read(value, origin, ts));
                    consumer.accept(record);
                    return 1;
                }
                return 0;
            }
​
            @Override
            public int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,
                              BlockingConsumer<SourceRecord> consumer)
                    throws InterruptedException {
                validateColumnCount(tableSchema, row);
                Object key = tableSchema.keyFromColumnData(row);
                Struct value = tableSchema.valueFromColumnData(row);
                if (value != null || key != null) {
                    Schema keySchema = tableSchema.keySchema();
                    Map<String, ?> partition = source.partition();
                    Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);
                    source.tableEvent(id);
                    Struct origin = source.struct();
                    SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
                            keySchema, key, envelope.schema(), envelope.create(value, origin, ts));
                    consumer.accept(record);
                    return 1;
                }
                return 0;
            }
​
            @Override
            public int update(SourceInfo source, Object[] before, Object[] after, int rowNumber, int numberOfRows, BitSet includedColumns,
                              Instant ts,
                              BlockingConsumer<SourceRecord> consumer)
                    throws InterruptedException {
                int count = 0;
                validateColumnCount(tableSchema, after);
                Object key = tableSchema.keyFromColumnData(after);
                Struct valueAfter = tableSchema.valueFromColumnData(after);
                if (valueAfter != null || key != null) {
                    Object oldKey = tableSchema.keyFromColumnData(before);
                    Struct valueBefore = tableSchema.valueFromColumnData(before);
                    Schema keySchema = tableSchema.keySchema();
                    Map<String, ?> partition = source.partition();
                    Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);
                    source.tableEvent(id);
                    Struct origin = source.struct();
                    if (key != null && !Objects.equals(key, oldKey)) {
                        // The key has changed, so we need to deal with both the new key and old key.
                        // Consumers may push the events into a system that won't allow both records to exist at the same time,
                        // so we first want to send the delete event for the old key...
                        SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
                                keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts));
                        consumer.accept(record);
                        ++count;
​
                        if (emitTombstoneOnDelete) {
                            // Next send a tombstone event for the old key ...
                            record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, oldKey, null, null);
                            consumer.accept(record);
                            ++count;
                        }
​
                        // And finally send the create event ...
                        record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
                                keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts));
                        consumer.accept(record);
                        ++count;
                    }
                    else {
                        // The key has not changed, so a simple update is fine ...
                        SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
                                keySchema, key, envelope.schema(), envelope.update(valueBefore, valueAfter, origin, ts));
                        consumer.accept(record);
                        ++count;
                    }
                }
                return count;
            }
​
            @Override
            public int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,
                              BlockingConsumer<SourceRecord> consumer)
                    throws InterruptedException {
                int count = 0;
                validateColumnCount(tableSchema, row);
                Object key = tableSchema.keyFromColumnData(row);
                Struct value = tableSchema.valueFromColumnData(row);
                if (value != null || key != null) {
                    Schema keySchema = tableSchema.keySchema();
                    Map<String, ?> partition = source.partition();
                    Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows);
                    source.tableEvent(id);
                    Struct origin = source.struct();
                    // Send a delete message ...
                    SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
                            keySchema, key, envelope.schema(), envelope.delete(value, origin, ts));
                    consumer.accept(record);
                    ++count;
​
                    // And send a tombstone ...
                    if (emitTombstoneOnDelete) {
                        record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
                                keySchema, key, null, null);
                        consumer.accept(record);
                        ++count;
                    }
                }
                return count;
            }
​
            @Override
            public String toString() {
                return "RecordMaker.Converter(" + id + ")";
            }
​
            private void validateColumnCount(TableSchema tableSchema, Object[] row) {
                final int expectedColumnsCount = schema.tableFor(tableSchema.id()).columns().size();
                if (expectedColumnsCount != row.length) {
                    logger.error("Invalid number of columns, expected '{}' arrived '{}'", expectedColumnsCount, row.length);
                    throw new ConnectException(
                            "The binlog event does not contain expected number of columns; the internal schema representation is probably out of sync with the real database schema, or the binlog contains events recorded with binlog_row_image other than FULL or the table in question is an NDB table");
                }
            }
        };
  • RecordMakers创建了一个匿名Converter实现类,其实现方法基本是创建kafka connect的SourceRecord,然后执行consumer.accept(record)

小结

RecordMakers提供了forTable方法,用于创建RecordsForTable;RecordsForTable提供了read、create、update、delete方法,它们都委托给converter的对应方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RecordMakers
  • RecordsForTable
  • Converter
  • RecordMakers.Converter
  • 小结
  • doc
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档