前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊debezium的ChangeEventQueue

聊聊debezium的ChangeEventQueue

原创
作者头像
code4it
修改2020-05-13 14:41:08
8090
修改2020-05-13 14:41:08
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下debezium的ChangeEventQueue

ChangeEventQueueMetrics

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java

代码语言:javascript
复制
public interface ChangeEventQueueMetrics {
​
    int totalCapacity();
​
    int remainingCapacity();
}
  • ChangeEventQueueMetrics接口定义了totalCapacity、remainingCapacity方法

ChangeEventQueue

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java

代码语言:javascript
复制
public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class);
​
    private final Duration pollInterval;
    private final int maxBatchSize;
    private final int maxQueueSize;
    private final BlockingQueue<T> queue;
    private final Metronome metronome;
    private final Supplier<PreviousContext> loggingContextSupplier;
​
    private volatile RuntimeException producerException;
​
    private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
        this.pollInterval = pollInterval;
        this.maxBatchSize = maxBatchSize;
        this.maxQueueSize = maxQueueSize;
        this.queue = new LinkedBlockingDeque<>(maxQueueSize);
        this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
        this.loggingContextSupplier = loggingContextSupplier;
    }
​
    public static class Builder<T> {
​
        private Duration pollInterval;
        private int maxQueueSize;
        private int maxBatchSize;
        private Supplier<LoggingContext.PreviousContext> loggingContextSupplier;
​
        public Builder<T> pollInterval(Duration pollInterval) {
            this.pollInterval = pollInterval;
            return this;
        }
​
        public Builder<T> maxQueueSize(int maxQueueSize) {
            this.maxQueueSize = maxQueueSize;
            return this;
        }
​
        public Builder<T> maxBatchSize(int maxBatchSize) {
            this.maxBatchSize = maxBatchSize;
            return this;
        }
​
        public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
            this.loggingContextSupplier = loggingContextSupplier;
            return this;
        }
​
        public ChangeEventQueue<T> build() {
            return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier);
        }
    }
​
    /**
     * Enqueues a record so that it can be obtained via {@link #poll()}. This method
     * will block if the queue is full.
     *
     * @param record
     *            the record to be enqueued
     * @throws InterruptedException
     *             if this thread has been interrupted
     */
    public void enqueue(T record) throws InterruptedException {
        if (record == null) {
            return;
        }
​
        // The calling thread has been interrupted, let's abort
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
​
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Enqueuing source record '{}'", record);
        }
​
        // this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue
        queue.put(record);
    }
​
    /**
     * Returns the next batch of elements from this queue. May be empty in case no
     * elements have arrived in the maximum waiting time.
     *
     * @throws InterruptedException
     *             if this thread has been interrupted while waiting for more
     *             elements to arrive
     */
    public List<T> poll() throws InterruptedException {
        LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();
​
        try {
            LOGGER.debug("polling records...");
            List<T> records = new ArrayList<>();
            final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
            while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) {
                throwProducerExceptionIfPresent();
​
                LOGGER.debug("no records available yet, sleeping a bit...");
                // no records yet, so wait a bit
                metronome.pause();
                LOGGER.debug("checking for more records...");
            }
            return records;
        }
        finally {
            previousContext.restore();
        }
    }
​
    public void producerException(final RuntimeException producerException) {
        this.producerException = producerException;
    }
​
    private void throwProducerExceptionIfPresent() {
        if (producerException != null) {
            throw producerException;
        }
    }
​
    @Override
    public int totalCapacity() {
        return maxQueueSize;
    }
​
    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }
}
  • ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()

Threads

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Threads.java

代码语言:javascript
复制
public class Threads {
​
    //......
​
    public static interface TimeSince {
        /**
         * Reset the elapsed time to 0.
         */
        void reset();
​
        /**
         * Get the time that has elapsed since the last call to {@link #reset() reset}.
         *
         * @return the number of milliseconds
         */
        long elapsedTime();
    }
​
    public static interface Timer {
​
        /**
         * @return true if current time is greater than start time plus requested time period
         */
        boolean expired();
​
        Duration remaining();
    }
​
    public static Timer timer(Clock clock, Duration time) {
        final TimeSince start = timeSince(clock);
        start.reset();
​
        return new Timer() {
​
            @Override
            public boolean expired() {
                return start.elapsedTime() > time.toMillis();
            }
​
            @Override
            public Duration remaining() {
                return time.minus(start.elapsedTime(), ChronoUnit.MILLIS);
            }
        };
    }
​
    public static TimeSince timeSince(Clock clock) {
        return new TimeSince() {
            private long lastTimeInMillis;
​
            @Override
            public void reset() {
                lastTimeInMillis = clock.currentTimeInMillis();
            }
​
            @Override
            public long elapsedTime() {
                long elapsed = clock.currentTimeInMillis() - lastTimeInMillis;
                return elapsed <= 0L ? 0L : elapsed;
            }
        };
    }
​
    //......
​
}
  • Threads定义了Timer接口,该接口定义了expired、remaining方法;timer方法先通过timeSince创建TimeSince,然后创建一个匿名Timer

LoggingContext

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/LoggingContext.java

代码语言:javascript
复制
public class LoggingContext {
​
    /**
     * The key for the connector type MDC property.
     */
    public static final String CONNECTOR_TYPE = "dbz.connectorType";
    /**
     * The key for the connector logical name MDC property.
     */
    public static final String CONNECTOR_NAME = "dbz.connectorName";
    /**
     * The key for the connector context name MDC property.
     */
    public static final String CONNECTOR_CONTEXT = "dbz.connectorContext";
​
    private LoggingContext() {
    }
​
    /**
     * A snapshot of an MDC context that can be {@link #restore()}.
     */
    public static final class PreviousContext {
        private static final Map<String, String> EMPTY_CONTEXT = Collections.emptyMap();
        private final Map<String, String> context;
​
        protected PreviousContext() {
            Map<String, String> context = MDC.getCopyOfContextMap();
            this.context = context != null ? context : EMPTY_CONTEXT;
        }
​
        /**
         * Restore this logging context.
         */
        public void restore() {
            MDC.setContextMap(context);
        }
    }
​
    //......
​
}
  • LoggingContext定义了PreviousContext,其构造器使用MDC.getCopyOfContextMap()拷贝的当前的MDC,其restore方法把之前拷贝的MDC数据再次设置到MDC中

Metronome

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Metronome.java

代码语言:javascript
复制
@FunctionalInterface
public interface Metronome {
​
    public void pause() throws InterruptedException;
​
    public static Metronome sleeper(Duration period, Clock timeSystem) {
        long periodInMillis = period.toMillis();
        return new Metronome() {
            private long next = timeSystem.currentTimeInMillis() + periodInMillis;
​
            @Override
            public void pause() throws InterruptedException {
                for (;;) {
                    final long now = timeSystem.currentTimeInMillis();
                    if (next <= now) {
                        break;
                    }
                    Thread.sleep(next - now);
                }
                next = next + periodInMillis;
            }
​
            @Override
            public String toString() {
                return "Metronome (sleep for " + periodInMillis + " ms)";
            }
        };
    }
​
    //......
​
}
  • Metronome接口定义了pause方法;它提供了sleeper静态方法用于创建匿名的Metronome实现类,该实现类的pause方法通过Thread.sleep来实现pause

小结

ChangeEventQueue实现了ChangeEventQueueMetrics接口,其构造器创建了BlockingQueue、Metronome,并接收了loggingContextSupplier;其enqueue方法执行queue.put(record);其poll方法先通过loggingContextSupplier.get()获取previousContext,之后创建timeout,并while循环执行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0为false,最后执行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ChangeEventQueueMetrics
  • ChangeEventQueue
  • Threads
  • LoggingContext
  • Metronome
  • 小结
  • doc
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档