前文我们梳理了 Flink 状态管理相关的源码,我们知道,状态是要与 Checkpoint 配合使用的。因此,本文我们就一起来看一下 Checkpoint 相关的源码。
在Flink学习笔记:如何做容错一文中,我们介绍了 Flink 的 Checkpoint 机制。Checkpoint 分为 EXACTLY_ONCE 和 AT_LEAST_ONCE 两种模式。
我们一起回顾一下一次完整的 Checkpoint 具体流程:Checkpoint 是由 CheckpointCoordinator 触发,Source 节点收到触发请求后,会将 State 进行持久化,同时向下游发送 Barrier 消息,下游节点收到 Barrier 消息后,也同样对 State 进行持久化和发送 Barrier 消息。当所有节点都完成持久化过程后 CheckpointCoordinator 会将一些元数据进行持久化。
带着这些背景知识,我们再来梳理一下 Checkpoint 相关的代码。
JobManager 在调用 DefaultExecutionGraphBuilder.buildGraph 生成 ExecutionGraph 之后,会调用 executionGraph.enableCheckpointing 方法来设置 Checkpoint 相关的配置,这个方法中创建了 CheckpointCoordinator 并注册了 CheckpointCoordinatorDeActivator 这个监听,它负责启动和停止 Checkpoint 的调度。
当作业变成 RUNNING 状态时,CheckpointCoordinator 会部署一个定时任务 ScheduledTrigger,这个定时任务就是用来周期性的触发 Checkpoint。
触发 Checkpoint 的核心逻辑在 CheckpointCoordinator.startTriggeringCheckpoint 这个方法中。这个方法中使用了多个 CompletableFuture 来完成整个流程的编排。具体流程见下图(图中不同颜色代表着使用不同线程池执行)。

checkpoint

triggerTask
至此,JobManager 端的触发流程就完成了,接下来就到了 TaskManager 端了。
进入 TaskExecutor 后,具体调用过程如下图。

TaskManagerCheckpoint
TaskManager 的核心逻辑在 SubtaskCheckpointCoordinatorImpl.checkpointState 方法中。这个方法中的注释也很详细,整体上分为6个步骤:
下面我们来关注几个重点的步骤。
在步骤2中,首先是创建 Barrier,Barrier 消息包括三个部分
// checkpointId
private final long id;
// 时间戳
private final long timestamp;
// checkpoint 相关参数,包括对齐类型、checkpoint 类型、目前地址
private final CheckpointOptions checkpointOptions;
生成 Barrier 之后,会调用 operatorChain.broadcastEvent 进行广播消息。这里广播消息就是向下游所有的节点的所有 ResultSubpartition 发送。
SubtaskCheckpointCoordinatorImpl.takeSnapshotSync 方法用来构建 OperatorSnapshotFutures 中的四个 Future,每个 Future 的任务是为不同类型的 State 提供写入逻辑。
@Nonnull private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;
@Nonnull private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
@Nonnull private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;
@Nonnull private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
在底层逻辑中,会为每个 Operator 设置对应的 State 的 Future。具体调用流程如下

snapshotState
设置好这些 Future 之后,会在 finishAndReportAsync 方法中创建 AsyncCheckpointRunnable 线程调用 get 来获取执行结果,拿到执行结果后会将 Checkpoint 信息上报给 CheckpointCoordinator。

TaskManagerReport
TaskManager 通过调用 checkpointCoordinatorGateway.acknowledgeCheckpoint 上报 Checkpoint 信息后,流程就又回到 JobManager 了。
JobManager 的确认流程主要做了两件事:
至此,JobManager 和 Source 端算子的一次 Checkpoint 就完成了。接下来我们再看一下非 Source 节点是如何做 Checkpoint 的。
非 Source 节点处理 Barrier 的入口和处理业务数据的入口相同,都是 StreamTask.processInput 方法。我们还是先来看具体的调用流程。

processBarrier
跟着调用链路,我们一路找到了 processBarrier 方法,这里区分了两个 barrierHandler。SingleCheckpointBarrierHandler 负责处理 EXACTLY_ONCE 语义,CheckpointBarrierTracker 负责处理 AT_LEAST_ONCE 语义。
EXACTLY_ONCE 在处理 Barrier 的逻辑如下:
a) 如果收到的是第一个 channel,标记开始进行 barrier 对齐,并阻塞 channel。
b) 如果不是第一个 channel,也不是最后一个 channel,只对 channel 进行阻塞。
c) 如果收到最后一个 channel,就会触发 Checkpoint,并取消所有 channel 阻塞状态。
这里触发的逻辑与 Source 节点相同,通过调用链路可以一直找到 performCheckpoint。
AT_LEAST_ONCE 处理 Barrier 的逻辑如下:
a) 如果收到的是第一个 channel,则更新当前 checkpointID,标记开始 barrier 对齐。
b) 如果收到的不是第一个 channel,也不是最后一个 channel,就只做计数。
c) 如果收到的是最后一个 channel,就会开始触发 Checkpoint。
这里触发逻辑也是调用 performCheckpoint,与 Source 节点逻辑相同。
本文我们梳理了 Checkpoint 的源码逻辑。最开始由 JobManager 中的 CheckpointCoordinator 进行调度,并向 TaskManager 发送触发请求。Source 节点收到请求后会向下游发送 Barrier 消息然后写入状态数据和上报 Checkpoint 信息。CheckpointCoordinator 收集完确认消息后,会持久化元数据并通知所有 Task 完成 commit。最后还分别介绍了 EXACTLY_ONCE 和 AT_LEAST_ONCE 模式下非 Source 节点的处理逻辑。
这里埋一个 Hook,状态数据写入逻辑的细节我们没有深入了解,会在下篇进行深入分析。