本文主要研究一下debezium的OffsetCommitPolicy
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java
@Incubating
@FunctionalInterface
public interface OffsetCommitPolicy {
boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit);
static OffsetCommitPolicy always() {
return new AlwaysCommitOffsetPolicy();
}
static OffsetCommitPolicy periodic(Properties config) {
return new PeriodicCommitOffsetPolicy(config);
}
}
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java
public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {
@Override
public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
return true;
}
}
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java
public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {
private final Duration minimumTime;
public PeriodicCommitOffsetPolicy(Properties config) {
minimumTime = Duration.ofMillis(Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP)));
}
@Override
public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
return timeSinceLastCommit.compareTo(minimumTime) >= 0;
}
}
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java
public static interface RecordCommitter<R> {
/**
* Marks a single record as processed, must be called for each
* record.
*
* @param record the record to commit
*/
void markProcessed(R record) throws InterruptedException;
/**
* Marks a batch as finished, this may result in committing offsets/flushing
* data.
* <p>
* Should be called when a batch of records is finished being processed.
*/
void markBatchFinished();
}
debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java
@ThreadSafe
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
//......
protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter, SourceTask task, Duration commitTimeout) {
return new RecordCommitter() {
@Override
public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
task.commitRecord(record);
recordsSinceLastCommit += 1;
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
}
@Override
public synchronized void markBatchFinished() {
maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeout, task);
}
};
}
protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout,
SourceTask task) {
// Determine if we need to commit to offset storage ...
long timeSinceLastCommitMillis = clock.currentTimeInMillis() - timeOfLastCommitMillis;
if (policy.performCommit(recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))) {
commitOffsets(offsetWriter, commitTimeout, task);
}
}
protected void commitOffsets(OffsetStorageWriter offsetWriter, Duration commitTimeout, SourceTask task) {
long started = clock.currentTimeInMillis();
long timeout = started + commitTimeout.toMillis();
if (!offsetWriter.beginFlush()) {
return;
}
Future<Void> flush = offsetWriter.doFlush(this::completedFlush);
if (flush == null) {
return; // no offsets to commit ...
}
// Wait until the offsets are flushed ...
try {
flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS);
// if we've gotten this far, the offsets have been committed so notify the task
task.commit();
recordsSinceLastCommit = 0;
timeOfLastCommitMillis = clock.currentTimeInMillis();
}
catch (InterruptedException e) {
logger.warn("Flush of {} offsets interrupted, cancelling", this);
offsetWriter.cancelFlush();
}
catch (ExecutionException e) {
logger.error("Flush of {} offsets threw an unexpected exception: ", this, e);
offsetWriter.cancelFlush();
}
catch (TimeoutException e) {
logger.error("Timed out waiting to flush {} offsets to storage", this);
offsetWriter.cancelFlush();
}
}
//......
}
OffsetCommitPolicy定义了performCommit方法,并提供了always静态方法用于创建AlwaysCommitOffsetPolicy;提供了periodic静态方法用于创建PeriodicCommitOffsetPolicy
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。