Flink 应用要想大规模可靠运行,必须满足两个条件:
第一部分讨论如何大规模执行checkpoint。 最后一部分解释了一些关于规划要使用多少资源的最佳实践。
监控检查点行为的最简单方法是通过 UI 的检查点部分。 检查点监控的文档显示了如何访问可用的检查点指标。
扩大检查点时特别感兴趣的两个问题(都通过任务级别指标和 Web 界面公开)是:
理想情况下,这两个值都应该很低 – 较高的数量意味着由于一些背压检查点屏障缓慢地通过作业图,(没有足够的资源来处理传入的记录)。 这也可以通过增加处理记录的端到端延迟来观察。 请注意,在存在瞬时背压、数据倾斜或网络问题的情况下,这些数字有时会很高。
非对齐的检查点可用于加快检查点障碍的传播时间。 但是请注意,这并不能解决导致背压的根本问题(并且端到端记录延迟将保持很高)。
应用程序可以配置定期触发检查点。 当检查点的完成时间超过检查点间隔时,在进行中的检查点完成之前不会触发下一个检查点。 默认情况下,一旦正在进行的检查点完成,将立即触发下一个检查点。
应用程序可以配置定期触发检查点。 当检查点的完成时间超过检查点间隔时,在进行中的检查点完成之前不会触发下一个检查点。 默认情况下,一旦正在进行的检查点完成,将立即触发下一个检查点。
当检查点最终经常花费比基本间隔更长的时间(例如,因为状态增长大于计划,或者存储检查点的存储暂时很慢),系统会不断地获取检查点(一旦完成,新的检查点就会立即启动) . 这可能意味着过多的资源一直被检查点所占用,而Operator的处理太少。 此行为对使用异步检查点状态的流式应用程序的影响较小,但仍可能对整体应用程序性能产生影响。
为了防止这种情况,应用程序可以定义检查点之间的最小持续时间:
StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
此持续时间是最近一个检查点结束和下一个检查点开始之间必须经过的最小时间间隔。 下图说明了这如何影响检查点。
注意:可以(通过 CheckpointConfig)配置应用程序以允许同时进行多个检查点。 对于 Flink 中状态较大的应用程序,这通常会将过多的资源绑定到检查点中。 当手动触发保存点时,它可能与正在进行的检查点同时进行。
许多大型 Flink 流应用程序的状态存储主力是 RocksDB 状态后端。 后端的扩展性远远超出了主内存,并且可靠地存储了大的keyed状态。
RocksDB 的性能可能因配置而异,本节概述了使用 RocksDB 状态后端调整作业的一些最佳实践。
在减少检查点花费的时间方面,激活增量检查点应该是首要考虑因素之一。 与完整检查点相比,增量检查点可以显着减少检查点时间,因为增量检查点仅记录与先前完成的检查点相比的更改,而不是生成状态后端的完整、自包含备份。
计时器默认存储在 RocksDB 中,这是更健壮和可扩展的选择。
当性能调整作业只有几个计时器(没有窗口,不使用 ProcessFunction 中的计时器)时,将这些计时器放在堆上可以提高性能。 请谨慎使用此功能,因为基于堆的计时器可能会增加检查点时间,并且自然无法扩展到内存之外。
RocksDB 状态后端的性能很大程度上取决于它可用的内存量。 为了提高性能,增加内存会很有帮助,或者调整内存的功能。
默认情况下,RocksDB 状态后端使用 Flink 为 RocksDB 缓冲区和缓存管理的内存预算(state.backend.rocksdb.memory.managed: true)。 有关该机制如何工作的背景信息,请参阅 RocksDB 内存管理。
要调整与内存相关的性能问题,以下步骤可能会有所帮助:
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {
@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// increase the max background flush threads when we have many states in one operator,
// which means we would have many column families in one DB instance.
return currentOptions.setMaxBackgroundFlushes(4);
}
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// decrease the arena block size from default 8MB to 1MB.
return currentOptions.setArenaBlockSize(1024 * 1024);
}
@Override
public OptionsFactory configure(ReadableConfig configuration) {
return this;
}
}
本节讨论如何确定 Flink 作业应该使用多少资源才能可靠运行。 容量规划的基本经验法则是:
重要:为了允许以后添加资源,请确保将数据流程序的最大并行度设置为合理的数字。 最大并行度定义了在重新缩放程序时(通过保存点)可以设置程序并行度的高度。
Flink 的内部簿记以 max-parallelism-many 键组的粒度跟踪并行状态。 Flink 的设计力求使最大并行度具有非常高的值变得高效,即使以低并行度执行程序也是如此。
Flink 为所有检查点和保存点提供可选的压缩(默认:关闭)。 目前,压缩始终使用 snappy 压缩算法(版本 1.1.4),但我们计划在未来支持自定义压缩算法。 压缩作用于keyed状态下键组的粒度,即每个键组可以单独解压缩,这对于重新缩放很重要。
可以通过 ExecutionConfig 激活压缩:
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setUseSnapshotCompression(true);
注意压缩选项对增量快照没有影响,因为它们使用的是 RocksDB 的内部格式,该格式始终使用开箱即用的 snappy 压缩。
在 Flink 的检查点中,每个任务都会生成其状态的快照,然后将其写入分布式存储。 每个任务通过发送描述状态在分布式存储中的位置的句柄来向Jobmanager确认状态的成功写入。 Jobmanager反过来收集所有任务的句柄并将它们捆绑到一个检查点对象中。
在恢复的情况下,Jobmanager打开最新的检查点对象并将句柄发送回相应的任务,然后可以从分布式存储中恢复它们的状态。 使用分布式存储来存储状态有两个重要的优势。 首先,存储是容错的,其次,分布式存储中的所有状态都可以被所有节点访问,并且可以很容易地重新分配(例如,用于重新缩放)。
但是,使用远程分布式存储也有一个很大的缺点:所有任务都必须通过网络从远程位置读取它们的状态。 在很多情况下,recovery 可以将失败的任务重新调度到与上次运行时相同的Taskmanager(当然也有机器故障等例外),但我们仍然需要读取远程状态。 这可能导致大型状态的恢复时间很长,即使单台机器上只有一个小故障。
任务本地状态恢复正是针对这个恢复时间长的问题,其主要思想是:对于每个检查点,每个任务不仅将任务状态写入分布式存储,而且在一个备份中保存一份状态快照的副本。 任务本地的存储(例如在本地磁盘或内存中)。 请注意,快照的主存储仍然必须是分布式存储,因为本地存储在节点故障下无法确保持久性,并且也不提供其他节点重新分配状态的访问权限,因此此功能仍然需要主副本。
但是,对于每个可以重新调度到先前位置进行恢复的任务,我们可以从辅助的本地副本恢复状态,并避免远程读取状态的成本。 鉴于许多故障不是节点故障,并且节点故障通常一次只影响一个或很少的节点,因此在恢复过程中,大多数任务很可能可以返回到它们之前的位置并发现它们的本地状态完好无损。 这就是使本地恢复有效地减少恢复时间的原因。
请注意,根据所选的状态后端和检查点策略,创建和存储辅助本地状态副本的每个检查点可能会产生一些额外费用。 例如,在大多数情况下,实现将简单地将分布式存储的写入复制到本地文件。
任务本地状态始终被视为辅助副本,检查点状态的基本事实是分布式存储中的主副本。 这对检查点和恢复期间的本地状态问题有影响:
任务本地恢复默认是停用的,可以通过 Flink 的配置使用 CheckpointingOptions.LOCAL_RECOVERY 中指定的 key state.backend.local-recovery 来激活。 此设置的值可以是 true 以启用或 false(默认)以禁用本地恢复。
请注意,未对齐的检查点当前不支持任务本地恢复。
限制:目前,任务本地恢复仅涵盖keyed状态后端。 keyed状态通常是该状态的最大部分。 在不久的将来,我们还将介绍操作员状态和计时器。
以下状态后端可以支持任务本地恢复。
任务本地恢复假设在故障下保留分配的任务调度,其工作原理如下。 每个任务都会记住其先前的分配并请求完全相同的插槽以重新启动恢复。 如果此槽不可用,任务将向资源管理器请求一个新的新槽。 这样,如果任务管理器不再可用,则无法返回其先前位置的任务将不会将其他正在恢复的任务赶出其先前的插槽。 我们的理由是,只有当任务管理器不再可用时,前一个插槽才会消失,在这种情况下,某些任务无论如何都必须请求新的插槽。 使用我们的调度策略,我们让最大数量的任务有机会从它们的本地状态中恢复,并避免任务从彼此之间窃取之前的插槽的级联效应。
本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。