前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >知根知底:Flink-KafkaConsumer 详解

知根知底:Flink-KafkaConsumer 详解

作者头像
Flink实战剖析
发布于 2022-06-10 09:53:53
发布于 2022-06-10 09:53:53
9570
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

Flink-Kafka Connector 是连接kafka 的连接器,负责对接kafka 的读写, 本篇主要介绍kafka consumer 的执行流程与核心设计。

逻辑执行流程

  1. 分配当前task消费的partition与起始的offset : 根据从状态中恢复的数据与客户端指定的消费模式, 采取的方式是状态中offset优先, 即从状态中能够找到对应的offset 就使用该offset , 否则就根据客户端指定的方式
  2. 从kafka 中不断拉取数据, 发送到下游,并且保存当前的offset
  3. 为了保证整个任务的全局一致性,需要将offset 提交到状态中
  4. 如果开启了分区发现模式,那么需要将检测到新的分区添加到消费线程中。

两个重要接口

Flink 保证全局数据一致性是通过全局状态快照checkpoint 完成的, 也就是周期性的执行checkpoint 将当前的任务状态保存起来, Flink 在整个checkpoint 的执行过程中提供了两个接口,方便用户去做一些自定义的操作, 例如操作状态、两阶段提交实现等等。

CheckpointedFunction接口

提供了initializeState方法与snapshotState方法,initializeState方法是在任务初始化时候执行,常见的就是获取的checkpoint 中的状态数据;snapshotState方法是在每次checkpoint触发都会执行,常见的就是将数据存放在状态对象中,以便能够被持久化。

CheckpointListener接口

提供了notifyCheckpointComplete方法与notifyCheckpointAborted方法,这两个方法都是在一次checkpoint 完成之后执行,那么有可能是通知成功回调(notifyCheckpointComplete)也有可能失败回调(notifyCheckpointAborted)。

具体实现

对于Flink 来说source端的标准对接接口是SourceFunction ,主要实现其run方法,在run 中执行数据的pull操作;另外为了保证整个状态的一致性,在checkpoint时需要记录checkpoint时的offset, 并且保证其失败重启时也能够从checkpoint 记录的offset开始消费, 因此同时实现了CheckpointedFunction接口与CheckpointListener接口,这两个接口提供了可操作状态的一些方法。

FlinkKafkaConsumerBase 实现SourceFunction、CheckpointedFunction、CheckpointListener接口的抽象类,包含了整个流程的核心方法,如下:

initializeState

从checkpoint 中 恢复最近一次或者是指定批次checkpoint 中offset, 并将其存放在TreeMap<KafkaTopicPartition,Long> 结的 restoredState 对象中

open

主要作用就是分配当前task消费的partitioin 的offset 位置

1. partition 分配策略:姑且认为是当前task的下标与 partition%numTask 相等就分配给当前task

2. offset 分配策略:有状态数据就使用状态数据的offset ; 没有就根据客户端指定的StartupMode作为消费起点

run

开始消费kafka 中数据, 通过 KafkaFetcher 完成 :

1. 启动了一个消费线程 KafkaConsumerThread 从kakfa 中拉取数据,将其存储到 Handover 的next 对象中

2. 循环从Handover 的next 中获取数据

3. 记录下当前的offset, 更新到subscribedPartitionStates 中去

createAndStartDiscoveryLoop

在run 方法中被调用, 开启了异步分区发现的线程discoveryLoopThread,会按照指定的时间间隔检查是否有新的分区(默认情况下不开启), 当发现有新的分区时会将其添加到unassignedPartitionsQueue中, 以便被KafkaConsumerThread 线程检测到

snapshotState

将记录的subscribedPartitionStates 中消费进度数据写入到 unionOffsetStates 状态中与临时对象pendingOffsetsToCommit中

notifyCheckpointComplete

提交offset 至kafka中:将pendingOffsetsToCommit 中记录当前批次checkpoint 的offset 数据提交到kafka 中

核心流程

  1. KakfaConsumerThread 线程不断从Kafka 中消费数据
  2. 消费的数据存储handover 中
  3. kafkaFetch 不断从handover 获取数据进行处理

其他流程

  1. initializeState、snapshotState 这两个方法是实现了CheckpointedFunction接口里面的对应方法,CheckpointedFunction 接口是Flink 提供的两个hook, 任务初始化执行initializeState,用于从状态中恢复数据, 优于open先执行, 用于其恢复offset数据;snapshotState 每次触发checkpoint 时执行,提供用户操作hook, 用于将offset 数据保存在状态中。
  2. notifyCheckpointComplete 是实现了CheckpointListener 接口中的方法, checkpoint 完成之后的回调方法, 提交状态中的offset数据至kafka中。

offset 提交

对于整个offset的提交至kafka中, 类似于两阶段的提交过程:

  • 第一阶段:执行checkpoint 时即调用snapshotState方法, offset 保存到状态中
  • 第二阶段:checkpoint 执行完成时回调notifyCheckpointComplete方法,offset 提交到kafka中

对于第一阶段失败任务直接重启,从最近一次checkpoint记录的位点开始消费,对于第二阶段提交offset至kafka如果失败,并不会导致任务重启,只是做了日志记录,因为提交offset到kafka成功与否并不会影响任务的执行。

启动时offset指定

  • 如果是从checkpoint 恢复,那么就会忽略客户端所指定的startMode , 也就是checkpoint 状态数据优先

总结

本篇主要介绍了FlinkKafkaConsumer的核心设计流程与实现,同时介绍了与checkpoint流程结合完成offset的管理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
flink-connector-kafka consumer checkpoint源码分析
转发请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7700600.html
sanmutongzi
2020/03/05
1.1K0
flink exectly-once系列之两阶段提交概述
二、TwoPhaseCommitSinkFunction与FlinkKafkaProducer源码分析
Flink实战剖析
2022/04/18
2K0
flink exectly-once系列之两阶段提交概述
flink exectly-once系列之两阶段提交实现分析
在【两阶段提交概述】中介绍了两阶段提交的基本思路以及如何根据checkpoint机制来实现两阶段提交思路,flink给出来两阶段提交抽象实现TwoPhaseCommitSinkFunction与具体实现FlinkKafkaProducer011。
Flink实战剖析
2022/04/18
1.1K0
flink exectly-once系列之两阶段提交实现分析
flink exactly-once系列之事务性输出实现
flink exactly-once系列目录: 一、两阶段提交概述 二、两阶段提交实现分析 三、StreamingFileSink分析 四、事务性输出实现 五、最终一致性实现 前几篇分析到Flink 是可以通过状态与checkpoint机制实现内部Exactly-Once 的语义,对于端到端的Exactly-Once语义,Flink 通过两阶段提交方式提供了对Kafka/HDFS输出支持,两阶段提交实现是结合checkpoint流程提供的hook来实现的,实现CheckpointedFunction与CheckpointListener接口: 1. initializeState 方法里面做事务状态的恢复与重新提交 2. snapshotState 方法里面开启事务与将需要输出的数据写到状态中容错 3. notifyCheckpointComplete方法提交事务 使用flink自带的实现要求继承TwoPhaseCommitSinkFunction类,并且实现beginTransaction、preCommit、commit、abort这几个方法,虽然说使用起来很方便,但是其有一个限制那就是所提供的事务hook(比喻Connection)能够被序列化,并且反序列化之后能够继续提交之前的事务,这个对于很多事务性的数据库是无法做到的,所以需要实现一套特有的事务提交。 之前分析到两阶段提交的主要问题是在第二阶段,commit有可能会存在部分成功与部分失败,所以才有了事务容错恢复,提交失败的重启继续提交,提交成功的重启再次提交是幂等的不会影响数据的结果,现在没有了这样一个可序列化的事务hook,另外需要提交的数据也做了状态容错。但是Flink 在checkpoint机制中提供了一个唯一的标识checkpointId, 它是用户可访问的、单调递增的、容错的,任务失败之后会从最近一次成功点继续递增,那么就可以使用checkpointId 来作为事务提交的句柄,首先看一下逻辑流程:
Flink实战剖析
2022/04/18
6340
flink exactly-once系列之事务性输出实现
Flink中: 你的Function是如何被执行的
在Flink编程中,不管你是使用DataStream api还是 Table/SQL ,接触最多的就是UserFunction , 比喻说MapFunction、ScalarFunction, 在这些Function 里面可以自定义用户的业务处理逻辑,但是这些Function是如何被调用的呢?本文主要介绍Function 被调用的流程以及对应的方法如何被调用的。
Flink实战剖析
2022/11/21
1K0
Flink中: 你的Function是如何被执行的
flink源码分析之kafka consumer的执行流程
线上flink任务稳定运行了两个多月了,突然之间收到了消息堆积较多的报警,kafka上看到的现象是消息堆积较多。问过业务人员得知,对应的流表在前一天重新刷了一遍数据,在我们的这个任务中有两次维表关联,而且内层有一个split操作会造成外层维表关联的数据量膨胀(最大可能为80倍,即split之后产生了80条新记录)。开始了问题分析之路。
山行AI
2021/04/29
3.3K0
flink源码分析之kafka consumer的执行流程
flink-connector-kafka consumer的topic分区分配源码
转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/7200599.html
sanmutongzi
2020/03/04
1K0
FileSystem/JDBC/Kafka - Flink三大Connector实现原理及案例
本文分别讲述了Flink三大Connector:FileSystem Connector、JDBC Connector和Kafka Connector的源码实现和案例代码。
王知无-import_bigdata
2021/03/26
2.4K0
Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义
Flink通过Checkpoint机制实现了消息对状态影响的Exactly Once语义,即每条消息只会影响Flink内部状态有且只有一次。但无法保证输出到Sink中的数据不重复。以图一所示为例,Flink APP收到Source中的A消息,将其转化为B消息输出到Sink,APP在处理完A1后做了一次Checkpoint,假设APP在处理到A4时发生错误重启,APP将会重新从A2开始消费并处理数据,就会导致B2和B3重复输出到Sink中两次。
2011aad
2020/02/14
5.3K0
Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义
Flink1.4 如何使用状态
Flink有两种基本的状态:Keyed State和Operator State。
smartsi
2019/08/07
1.1K0
Flink Source/Sink探究与实践:RocketMQ数据写入HBase
最近我们正在尝试把原有的一些Spark Streaming任务改造成Flink Streaming任务,自定义Source和Sink是遇到的第一个主要问题,稍微记录一下。
王知无-import_bigdata
2019/12/05
2.2K0
Flink Source/Sink探究与实践:RocketMQ数据写入HBase
【Flink】第五篇:checkpoint【2】
在上一篇文章「checkpoint【1】」中,我们讨论过在2PC过程的每个阶段出现故障时Flink的处理方式:
章鱼carl
2022/03/31
7080
【Flink】第五篇:checkpoint【2】
专家带你吃透 Flink 架构:一个 新版 Connector 的实现
Flink 可以说已经是流计算领域的事实标准,其开源社区发展迅速,提出了很多改进计划(Flink Improvement Proposals,简称 FLIP)并不断迭代,几乎每个新的版本在功能、性能和使用便捷性上都有所提高。Flink 提供了丰富的数据连接器(connecotr)来连接各种数据源,内置了 kafka、jdbc、hive、hbase、elasticsearch、file system 等常见的 connector,此外 Flink 还提供了灵活的机制方便开发者开发新的 connector。对于 source connector 的开发,有基于传统的 SourceFunction 的方式和基于 Flink 改进计划 FLIP-27 的 Source 新架构的方式。本文首先介绍基于 SourceFunction 方式的不足,接着介绍 Source 新架构以及其设计上的深层思考,然后基于 Flink 1.13 ,以从零开发一个简单的 FileSource connector 为例,介绍开发 source connector 的基本要素,尽量做到理论与实践相结合,加深大家的理解。
Raigor
2021/12/06
1.1K0
flink exectly-once系列之StreamingFileSink分析
一、两阶段提交概述 二、两阶段提交实现分析 三、StreamingFileSink分析 四、事务性输出实现 五、最终一致性实现
Flink实战剖析
2022/04/18
3770
flink exectly-once系列之StreamingFileSink分析
【万字长文】Flink cdc源码精讲(推荐收藏)
上面内容主要是以构建source所需要的参数为主,具体我们进入到DebeziumSourceFunction中看看具体实现
857技术社区
2022/05/17
5.5K0
【万字长文】Flink cdc源码精讲(推荐收藏)
知根知底: Flink Kafka-Producer详解
在实时数仓分层中,Kafka是一种比较常见的中间存储层,而在分布式计算中由于硬件、软件等异常导致的任务重启是一种正常的现象,通过之前的Kafka-Consumer分析得知,offset 是跟随着checkpoint周期性的保存, 那么消息是有可能被重复消费的,而Kafka 作为输出端并不属于整个Flink任务状态的一部分,重复被消费的消息会重复的输出,因此为了保证输出到Kafka数据的一致性,Flink 在Kafka Sink端的事务语义。本篇主要介绍Kafka-Sink 的执行流程与核心设计。
Flink实战剖析
2022/06/10
8490
知根知底: Flink Kafka-Producer详解
StreamOperator源码简析
StreamOperator是任务执行过程中实际处理类,上层由StreamTask调用,下层调用UserFunction,列举一些常见的StreamOperator
Flink实战剖析
2022/04/18
3730
StreamOperator源码简析
Flink如何保存Offset
版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/88956080
shengjk1
2019/04/09
2.6K0
2021年最新最全Flink系列教程__Flink高级API(三)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-znYxlAeB-1624261970363)(assets/image-20210507151242102.png)]
Maynor
2021/12/07
5260
2021年最新最全Flink系列教程__Flink高级API(三)
Flink startupMode是如何起作用的
版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/89067716
shengjk1
2019/04/18
1.4K0
相关推荐
flink-connector-kafka consumer checkpoint源码分析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档