Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊debezium的Heartbeat

聊聊debezium的Heartbeat

原创
作者头像
code4it
修改于 2020-05-15 06:29:46
修改于 2020-05-15 06:29:46
1.2K00
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下debezium的Heartbeat

Heartbeat

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface Heartbeat {public static final String HEARTBEAT_INTERVAL_PROPERTY_NAME = "heartbeat.interval.ms";
​
    @FunctionalInterface
    public static interface OffsetProducer {
        Map<String, ?> offset();
    }void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;
​
    boolean isEnabled();//......}
  • Heartbeat定义了OffsetProducer接口,该接口定义了offset方法;它还定义了heartbeat、forcedBeat、isEnabled方法

HeartbeatImpl

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class HeartbeatImpl implements Heartbeat {private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);
    private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);/**
     * Default length of interval in which connector generates periodically
     * heartbeat messages. A size of 0 disables heartbeat.
     */
    static final int DEFAULT_HEARTBEAT_INTERVAL = 0;/**
     * Default prefix for names of heartbeat topics
     */
    static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";private static final String SERVER_NAME_KEY = "serverName";private static Schema KEY_SCHEMA = SchemaBuilder.struct()
            .name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey"))
            .field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
            .build();
    private static Schema VALUE_SCHEMA = SchemaBuilder.struct()
            .name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat"))
            .field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
            .build();private final String topicName;
    private final Duration heartbeatInterval;
    private final String key;private volatile Timer heartbeatTimeout;HeartbeatImpl(Configuration configuration, String topicName, String key) {
        this.topicName = topicName;
        this.key = key;
​
        heartbeatInterval = configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
        heartbeatTimeout = resetHeartbeat();
    }
​
    @Override
    public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        if (heartbeatTimeout.expired()) {
            forcedBeat(partition, offset, consumer);
            heartbeatTimeout = resetHeartbeat();
        }
    }
​
    @Override
    public void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        if (heartbeatTimeout.expired()) {
            forcedBeat(partition, offsetProducer.offset(), consumer);
            heartbeatTimeout = resetHeartbeat();
        }
    }
​
    @Override
    public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer)
            throws InterruptedException {
        LOGGER.debug("Generating heartbeat event");
        if (offset == null || offset.isEmpty()) {
            // Do not send heartbeat message if no offset is available yet
            return;
        }
        consumer.accept(heartbeatRecord(partition, offset));
    }
​
    @Override
    public boolean isEnabled() {
        return true;
    }private SourceRecord heartbeatRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
        final Integer partition = 0;return new SourceRecord(sourcePartition, sourceOffset,
                topicName, partition, KEY_SCHEMA, serverNameKey(key), VALUE_SCHEMA, messageValue());
    }private Timer resetHeartbeat() {
        return Threads.timer(Clock.SYSTEM, heartbeatInterval);
    }//......}
  • HeartbeatImpl实现了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()时执行forcedBeat,然后执行resetHeartbeat;其forcedBeat方法执行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法创建SourceRecord并返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval)

DatabaseHeartbeatImpl

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/DatabaseHeartbeatImpl.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class DatabaseHeartbeatImpl extends HeartbeatImpl {
    private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHeartbeatImpl.class);public static final String HEARTBEAT_ACTION_QUERY_PROPERTY_NAME = "heartbeat.action.query";public static final Field HEARTBEAT_ACTION_QUERY = Field.create(HEARTBEAT_ACTION_QUERY_PROPERTY_NAME)
            .withDisplayName("The query to execute with every heartbeat")
            .withType(ConfigDef.Type.STRING)
            .withWidth(ConfigDef.Width.MEDIUM)
            .withImportance(ConfigDef.Importance.LOW)
            .withDescription("The query executed with every heartbeat. Defaults to an empty string.");private final String heartBeatActionQuery;
    private final JdbcConnection jdbcConnection;DatabaseHeartbeatImpl(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery) {
        super(configuration, topicName, key);this.heartBeatActionQuery = heartBeatActionQuery;
        this.jdbcConnection = jdbcConnection;
    }
​
    @Override
    public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        try {
            jdbcConnection.execute(heartBeatActionQuery);
        }
        catch (Exception e) {
            LOGGER.error("Could not execute heartbeat action", e);
        }
        LOGGER.debug("Executed heartbeat action query");super.forcedBeat(partition, offset, consumer);
    }
}
  • DatabaseHeartbeatImpl继承了HeartbeatImpl,其forcedBeat方法执行jdbcConnection.execute(heartBeatActionQuery),然后再执行super.forcedBeat(partition, offset, consumer)

Heartbeat.create

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface Heartbeat {//......public static Heartbeat create(Configuration configuration, String topicName, String key) {
        return configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero() ? NULL : new HeartbeatImpl(configuration, topicName, key);
    }public static Heartbeat create(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection) {
        if (configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero()) {
            return NULL;
        }
​
        String heartBeatActionQuery = configuration.getString(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY);if (heartBeatActionQuery != null) {
            return new DatabaseHeartbeatImpl(configuration, topicName, key, jdbcConnection, heartBeatActionQuery);
        }return new HeartbeatImpl(configuration, topicName, key);
    }//......}
  • Heartbeat提供了两个create静态方法,一个用于创建HeartbeatImpl,另外一个在heartBeatActionQuery不为hull时创建DatabaseHeartbeatImpl

小结

HeartbeatImpl实现了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()时执行forcedBeat,然后执行resetHeartbeat;其forcedBeat方法执行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法创建SourceRecord并返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval);DatabaseHeartbeatImpl继承了HeartbeatImpl,其forcedBeat方法执行jdbcConnection.execute(heartBeatActionQuery),然后再执行super.forcedBeat(partition, offset, consumer)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊debezium的RecordMakers
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
code4it
2020/05/20
5470
聊聊debezium的RecordMakers
聊聊debezium的BinlogReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java
code4it
2020/05/18
8040
聊聊debezium的BinlogReader
聊聊debezium的SimpleSourceConnector
debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/connector/simple/SimpleSourceConnector.java
code4it
2020/05/17
5120
聊聊debezium的SimpleSourceConnector
聊聊debezium的BlockingReader
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.java
code4it
2020/05/15
4190
聊聊debezium的BlockingReader
聊聊debezium的OffsetCommitPolicy
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.java
code4it
2020/05/21
1.2K0
聊聊debezium的OffsetCommitPolicy
聊聊DebeziumEngine
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java
code4it
2020/05/16
7140
聊聊DebeziumEngine
聊聊debezium的ChangeEventQueue
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.java
code4it
2020/05/12
8310
聊聊debezium的ChangeEventQueue
聊聊debezium的SnapshotChangeRecordEmitter
本文主要研究一下debezium的SnapshotChangeRecordEmitter
code4it
2020/05/13
5880
聊聊debezium的SnapshotChangeRecordEmitter
聊聊debezium的SnapshotChangeRecordEmitter
本文主要研究一下debezium的SnapshotChangeRecordEmitter
code4it
2020/05/16
3980
【万字长文】Flink cdc源码精讲(推荐收藏)
上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现
857技术社区
2022/05/17
5.5K0
【万字长文】Flink cdc源码精讲(推荐收藏)
Debezium的基本使用(以MySQL为例)
简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。
GreatSQL社区
2023/02/23
3.5K0
聊聊debezium的eventHandlers
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
code4it
2020/05/19
5410
聊聊debezium的eventHandlers
Flink1.9.0源码调试介绍&增加调试超时时间
在Flink1.9.0源码研究过程中,调试源码是一个非常重要的手段,通过查看真实的运行数据和变量,来了解源码内部运行逻辑
大数据真好玩
2019/12/05
2.9K0
【kafka】使用Kafka Connect API创建Apache Kafka连接器的4个步骤
Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。
瑞新
2020/12/07
1.3K0
【kafka】使用Kafka Connect API创建Apache Kafka连接器的4个步骤
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
操作步骤 Maven依赖 核心依赖 kafka-clients <dependency> <groupId>org.apache.kafkagroupId>
小小工匠
2021/08/17
5770
Kafka 客户端开发
前两篇文章讲述了 Kafka 的 工作机制 和 服务器集群部署。至此,Kafka 服务器已就绪,本文分别以官方API、Spring、SpringBoot三种构建方式,讲述了 Kafka 消费生产者和消费者的开发。
IT技术小咖
2019/06/26
1.3K0
聊聊spring cloud consul的TtlScheduler
spring-cloud-consul-discovery-2.1.2.RELEASE-sources.jar!/org/springframework/cloud/consul/discovery/TtlScheduler.java
code4it
2019/07/23
1.1K0
聊聊spring cloud consul的TtlScheduler
Flink CDC 2.0 数据处理流程全面解析
8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取、支持checkpoint,且在全量 + 增量读取的过程在不锁表的情况下保障数据一致性。
王知无-import_bigdata
2021/11/10
3.2K1
Flink CDC 2.0 数据处理流程全面解析
【Kafka】核心API
虚拟化软件推荐 VM https://www.cnblogs.com/PrayzzZ/p/11330937.html VirtualBOX
瑞新
2020/12/07
1.3K0
【Kafka】核心API
聊聊debezium的ElapsedTimeStrategy
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.java
code4it
2020/05/22
5690
聊聊debezium的ElapsedTimeStrategy
相关推荐
聊聊debezium的RecordMakers
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验