本文主要研究一下debezium的ChangeEventQueue
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java
public interface ChangeEventQueueMetrics {
int totalCapacity();
int remainingCapacity();
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java
public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class);
private final Duration pollInterval;
private final int maxBatchSize;
private final int maxQueueSize;
private final BlockingQueue<T> queue;
private final Metronome metronome;
private final Supplier<PreviousContext> loggingContextSupplier;
private volatile RuntimeException producerException;
private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
this.pollInterval = pollInterval;
this.maxBatchSize = maxBatchSize;
this.maxQueueSize = maxQueueSize;
this.queue = new LinkedBlockingDeque<>(maxQueueSize);
this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
this.loggingContextSupplier = loggingContextSupplier;
}
public static class Builder<T> {
private Duration pollInterval;
private int maxQueueSize;
private int maxBatchSize;
private Supplier<LoggingContext.PreviousContext> loggingContextSupplier;
public Builder<T> pollInterval(Duration pollInterval) {
this.pollInterval = pollInterval;
return this;
}
public Builder<T> maxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
return this;
}
public Builder<T> maxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}
public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
this.loggingContextSupplier = loggingContextSupplier;
return this;
}
public ChangeEventQueue<T> build() {
return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier);
}
}
/**
* Enqueues a record so that it can be obtained via {@link #poll()}. This method
* will block if the queue is full.
*
* @param record
* the record to be enqueued
* @throws InterruptedException
* if this thread has been interrupted
*/
public void enqueue(T record) throws InterruptedException {
if (record == null) {
return;
}
// The calling thread has been interrupted, let's abort
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enqueuing source record '{}'", record);
}
// this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue
queue.put(record);
}
/**
* Returns the next batch of elements from this queue. May be empty in case no
* elements have arrived in the maximum waiting time.
*
* @throws InterruptedException
* if this thread has been interrupted while waiting for more
* elements to arrive
*/
public List<T> poll() throws InterruptedException {
LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();
try {
LOGGER.debug("polling records...");
List<T> records = new ArrayList<>();
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) {
throwProducerExceptionIfPresent();
LOGGER.debug("no records available yet, sleeping a bit...");
// no records yet, so wait a bit
metronome.pause();
LOGGER.debug("checking for more records...");
}
return records;
}
finally {
previousContext.restore();
}
}
public void producerException(final RuntimeException producerException) {
this.producerException = producerException;
}
private void throwProducerExceptionIfPresent() {
if (producerException != null) {
throw producerException;
}
}
@Override
public int totalCapacity() {
return maxQueueSize;
}
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
}
queue.drainTo(records, maxBatchSize) == 0
为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Threads.java
public class Threads {
//......
public static interface TimeSince {
/**
* Reset the elapsed time to 0.
*/
void reset();
/**
* Get the time that has elapsed since the last call to {@link #reset() reset}.
*
* @return the number of milliseconds
*/
long elapsedTime();
}
public static interface Timer {
/**
* @return true if current time is greater than start time plus requested time period
*/
boolean expired();
Duration remaining();
}
public static Timer timer(Clock clock, Duration time) {
final TimeSince start = timeSince(clock);
start.reset();
return new Timer() {
@Override
public boolean expired() {
return start.elapsedTime() > time.toMillis();
}
@Override
public Duration remaining() {
return time.minus(start.elapsedTime(), ChronoUnit.MILLIS);
}
};
}
public static TimeSince timeSince(Clock clock) {
return new TimeSince() {
private long lastTimeInMillis;
@Override
public void reset() {
lastTimeInMillis = clock.currentTimeInMillis();
}
@Override
public long elapsedTime() {
long elapsed = clock.currentTimeInMillis() - lastTimeInMillis;
return elapsed <= 0L ? 0L : elapsed;
}
};
}
//......
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/LoggingContext.java
public class LoggingContext {
/**
* The key for the connector type MDC property.
*/
public static final String CONNECTOR_TYPE = "dbz.connectorType";
/**
* The key for the connector logical name MDC property.
*/
public static final String CONNECTOR_NAME = "dbz.connectorName";
/**
* The key for the connector context name MDC property.
*/
public static final String CONNECTOR_CONTEXT = "dbz.connectorContext";
private LoggingContext() {
}
/**
* A snapshot of an MDC context that can be {@link #restore()}.
*/
public static final class PreviousContext {
private static final Map<String, String> EMPTY_CONTEXT = Collections.emptyMap();
private final Map<String, String> context;
protected PreviousContext() {
Map<String, String> context = MDC.getCopyOfContextMap();
this.context = context != null ? context : EMPTY_CONTEXT;
}
/**
* Restore this logging context.
*/
public void restore() {
MDC.setContextMap(context);
}
}
//......
}
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Metronome.java
@FunctionalInterface
public interface Metronome {
public void pause() throws InterruptedException;
public static Metronome sleeper(Duration period, Clock timeSystem) {
long periodInMillis = period.toMillis();
return new Metronome() {
private long next = timeSystem.currentTimeInMillis() + periodInMillis;
@Override
public void pause() throws InterruptedException {
for (;;) {
final long now = timeSystem.currentTimeInMillis();
if (next <= now) {
break;
}
Thread.sleep(next - now);
}
next = next + periodInMillis;
}
@Override
public String toString() {
return "Metronome (sleep for " + periodInMillis + " ms)";
}
};
}
//......
}
ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0
为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。