本文主要研究一下debezium的SnapshotChangeRecordEmitter
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/SnapshotChangeRecordEmitter.java
public class SnapshotChangeRecordEmitter extends RelationalChangeRecordEmitter {
private final Object[] row;
public SnapshotChangeRecordEmitter(OffsetContext offset, Object[] row, Clock clock) {
super(offset, clock);
this.row = row;
}
@Override
protected Operation getOperation() {
return Operation.READ;
}
@Override
protected Object[] getOldColumnValues() {
throw new UnsupportedOperationException("Can't get old row values for READ record");
}
@Override
protected Object[] getNewColumnValues() {
return row;
}
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java
public abstract class RelationalChangeRecordEmitter extends AbstractChangeRecordEmitter<TableSchema> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
public RelationalChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
super(offsetContext, clock);
}
@Override
public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
TableSchema tableSchema = (TableSchema) schema;
Operation operation = getOperation();
switch (operation) {
case CREATE:
emitCreateRecord(receiver, tableSchema);
break;
case READ:
emitReadRecord(receiver, tableSchema);
break;
case UPDATE:
emitUpdateRecord(receiver, tableSchema);
break;
case DELETE:
emitDeleteRecord(receiver, tableSchema);
break;
default:
throw new IllegalArgumentException("Unsupported operation: " + operation);
}
}
@Override
protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
logger.warn("no new values found for table '{}' from create message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
return;
}
receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());
}
@Override
protected void emitReadRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.READ, newKey, envelope, getOffset());
}
@Override
protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();
Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
logger.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
return;
}
// some configurations does not provide old values in case of updates
// in this case we handle all updates as regular ones
if (oldKey == null || Objects.equals(oldKey, newKey)) {
Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.UPDATE, newKey, envelope, getOffset());
}
// PK update -> emit as delete and re-insert with new key
else {
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());
envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.CREATE, newKey, envelope, getOffset());
}
}
@Override
protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {
logger.warn("no old values found for table '{}' from delete message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
return;
}
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset());
}
/**
* Returns the operation done by the represented change.
*/
protected abstract Operation getOperation();
/**
* Returns the old row state in case of an UPDATE or DELETE.
*/
protected abstract Object[] getOldColumnValues();
/**
* Returns the new row state in case of a CREATE or READ.
*/
protected abstract Object[] getNewColumnValues();
/**
* Whether empty data messages should be ignored.
*
* @return true if empty data messages coming from data source should be ignored.</br>
* Typical use case are PostgreSQL changes without FULL replica identity.
*/
protected boolean skipEmptyMessages() {
return false;
}
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/AbstractChangeRecordEmitter.java
public abstract class AbstractChangeRecordEmitter<T extends DataCollectionSchema> implements ChangeRecordEmitter {
private final OffsetContext offsetContext;
private final Clock clock;
public AbstractChangeRecordEmitter(OffsetContext offsetContext, Clock clock) {
this.offsetContext = offsetContext;
this.clock = clock;
}
@Override
@SuppressWarnings({ "unchecked" })
public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException {
Operation operation = getOperation();
switch (operation) {
case CREATE:
emitCreateRecord(receiver, (T) schema);
break;
case READ:
emitReadRecord(receiver, (T) schema);
break;
case UPDATE:
emitUpdateRecord(receiver, (T) schema);
break;
case DELETE:
emitDeleteRecord(receiver, (T) schema);
break;
default:
throw new IllegalArgumentException("Unsupported operation: " + operation);
}
}
@Override
public OffsetContext getOffset() {
return offsetContext;
}
/**
* Returns the clock of the change record(s) emitted.
*/
public Clock getClock() {
return clock;
}
/**
* Returns the operation associated with the change.
*/
protected abstract Operation getOperation();
/**
* Emits change record(s) associated with a snapshot.
*
* @param receiver the handler for which the emitted record should be dispatched
* @param schema the schema
*/
protected abstract void emitReadRecord(Receiver receiver, T schema) throws InterruptedException;
/**
* Emits change record(s) associated with an insert operation.
*
* @param receiver the handler for which the emitted record should be dispatched
* @param schema the schema
*/
protected abstract void emitCreateRecord(Receiver receiver, T schema) throws InterruptedException;
/**
* Emits change record(s) associated with an update operation.
*
* @param receiver the handler for which the emitted record should be dispatched
* @param schema the schema
*/
protected abstract void emitUpdateRecord(Receiver receiver, T schema) throws InterruptedException;
/**
* Emits change record(s) associated with a delete operation.
*
* @param receiver the handler for which the emitted record should be dispatched
* @param schema the schema
*/
protected abstract void emitDeleteRecord(Receiver receiver, T schema) throws InterruptedException;
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/pipeline/spi/ChangeRecordEmitter.java
public interface ChangeRecordEmitter {
/**
* Emits the change record(s) corresponding to data change represented by this emitter.
*/
void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) throws InterruptedException;
/**
* Returns the offset of the change record(s) emitted.
*/
OffsetContext getOffset();
public interface Receiver {
void changeRecord(DataCollectionSchema schema, Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException;
}
}
SnapshotChangeRecordEmitter继承了RelationalChangeRecordEmitter,其构造器接收row,其getOperation方法返回Operation.READ,其getOldColumnValues方法抛出UnsupportedOperationException,其getNewColumnValues返回row