这些用户反馈在大量 的状态下,创建检查点通常很慢并且耗资源,这也是为什么Flink在 1.3版本开始引入“增量式的检查点”。...在引入“增量式的检查点”之前,每一个Flink的检查点都保存了程序完整的状态。...增量式的检查点可以为拥有大量状态的程序带来很大的提升。在早期的测试中,一个拥有TB级别“状态”程序将生成检查点的耗时从3分钟以上降低 到了30秒左右。...因为增量式的检查点不需要每次把完整的状态发送到存储中。 现在只能通过RocksDB state back-end来获取增量式检查点的功能,Flink使用RocksDB内置的备份机制来合并检查点数据。...Flink 增量式的检查点以“RocksDB”为基础,RocksDB是一个基于 LSM树的KV存储,新的数据保存在内存中,称为memtable。
RocksDB 的性能可能因配置而异,本节概述了使用 RocksDB 状态后端调整作业的一些最佳实践。 增量Checkpoints 在减少检查点花费的时间方面,激活增量检查点应该是首要考虑因素之一。...请谨慎使用此功能,因为基于堆的计时器可能会增加检查点时间,并且自然无法扩展到内存之外。 RocksDB内存调优 RocksDB 状态后端的性能很大程度上取决于它可用的内存量。...默认情况下,RocksDB 状态后端使用 Flink 为 RocksDB 缓冲区和缓存管理的内存预算(state.backend.rocksdb.memory.managed: true)。...这通常会大大改善这种情况,而不会增加调整底层RocksDB 选项的复杂性。 尤其是对于大型容器/进程大小,大部分总内存通常可以流向 RocksDB,除非应用程序逻辑本身需要大量 JVM 堆。...您可以通过设置 state.backend.rocksdb.memory.managed: false 来尝试比较使用托管内存的 RocksDB 与使用每列族内存的 RocksDB 的性能。
除了完整的独立状态快照之外,RocksDBStateBackend还支持增量检查点[5]作为性能调整选项。增量检查点仅存储自最近完成的检查点以来发生的更改。与执行完整快照相比,这大大减少了检查点时间。...RocksDBStateBackend是当前唯一支持增量检查点的状态后端。...在以下情况下,RocksDB是一个不错的选择: •您的工作状态大于本地内存所能容纳的状态(例如,长窗口,大keyed state[6]);•您正在研究增量检查点,以减少检查点时间。...有关更多详细信息,请查看此博客文章[30],了解如何在Flink中管理RocksDB内存大小以及RocksDB内存使用情况[31]Wiki页面。...如果Flink作业的状态太大而无法容纳在JVM堆上,则您对增量检查点感兴趣,或者希望具有可预测的延迟,则应使用RocksDBStateBackend。
执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中。MemoryStateBackend 可以使用异步的方式进行快照,也可以使用同步的方式。...适用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于 TaskManager 的内存大小,在执行检查点时,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中...3)RocksDBStateBackend 是目前唯一支持增量检查点的后端。增量检查点非常适用于超大状态的场景。 注意点: 1)总 State 大小仅限于磁盘大小,不受内存限制。...因为 sstable 是不可变的,Flink 对比前一个检查点创建和删除的 RocksDB sstable 文件就可以计算出状态有哪些改变。...通过合并,历史的 sstable 会合并成一个新的 sstable,并删除这些历史的 sstable,可以减少检查点的历史文件,避免大量小文件产生。
FsStateBackend 将正在使用的数据保存在 TaskManager 的内存中。在进行检查点操作时,将状态快照写入配置的文件系统文件和目录中。...较小的元数据存储在 JobManager 的内存中(或者在高可用性模式下,存储在元数据检查点中)。 FsStateBackend 默认使用异步快照,以避免在写入状态检查点时阻塞处理管道。...进行检查点操作时,整个 RocksDB 数据库进行检查点操作存储到配置的文件系统和目录中。较小的元数据存储在 JobManager 的内存中(或者在高可用性模式下,存储在元数据检查点中)。...重要的是在 RocksDB 中使用合并操作的状态(例如ListState)可以累积超过2^31字节,然后在下一次检索时会失败。目前这是 RocksDB JNI 的限制。...RocksDBStateBackend 是目前唯一个提供增量检查点的终端(见这里)。 3. 配置状态终端 如果你不指定,默认的状态终端是 jobmanager。
在有状态的流处理中,当开发人员启用了 Flink 中的检查点功能时,状态会持久化存储以防止数据的丢失并确保发生故障时能够完全恢复。为应用程序选择何种状态后端,取决于状态持久化的方式和位置。.../checkpoints s3://flink/checkpoints RocksDBStateBackend 将正在处理的数据使用 RocksDB 存储在本地磁盘上。...在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量差异数据存储到配置的文件系统中。...该状态后端同时也会在 JobManager 或者 Zookeeper(在高可用场景下)的内存中存储极少的元数据。。RocksDB 默认也是配置成异步快照。...RocksDBStateBackend 是目前唯一支持有状态流处理应用程序增量检查点的状态后端。 在使用 RocksDB 时,状态大小只受限于磁盘可用空间的大小。
RocksDB 具有 append-only 特性,Flink 利用这一特性将两次 checkpoint 之间 SST 文件列表的差异作为状态增量上传到分布式文件系统上,并通过 JobMaster 中的...多线程上传/下载增量文件,JobMaster 引用计数统计,以及大量与分布式文件系统的交互等过程,相对其他的StateBackend 要更为复杂,在 100+GB 甚至 TB 级别状态下,作业比较容易出现性能和稳定性瓶颈的问题...开启增量CheckPoint和本地恢复 开启增量CheckPoint RocksDB是目前唯一可用于支持有状态流处理应用程序增量检查点的状态后端,可以修改参数开启增量CheckPoint: state.backend.incremental...(有条件使用ssd的可以使用这个选项)。...我们一般使用第三个SPINNING_DISK_OPTIMIZED_HIGH_MEM,设置为机械硬盘+内存模式。
前言 为了解决Flink作业使用RocksDB状态后端时的内存超用问题,Flink早在1.10版本就实现了RocksDB的托管内存(managed memory)机制。...关于RocksDB使用托管内存,Flink官方文档给出了一段简短的解释: Flink does not directly manage RocksDB’s native memory allocations...本文先简单介绍一下RocksDB(版本5.17.2)内部的Cache和Write Buffer Manager这两个组件,然后看一眼Flink是如何借助它们来实现RocksDB内存托管的。...总大小限制在阈值内; 将WBM传给Cache,可以使两者共同控制RocksDB总内存占用量的上限。...Flink也正是利用了上述特性来实现RocksDB托管内存的。那么WBM与Cache如何协同工作?如下图所示。
一般而言,在生产中,我们会在 FsStateBackend 和 RocksDBStateBackend 间选择: FsStateBackend:性能更好;日常存储是在堆内存中,面临着 OOM 的风险,不支持增量...■ 容器内运行的 RocksDB 的内存超用问题 在 Flink-1.10 之前,由于一个 state 独占若干 write buffer 和一块 block cache,所以我们会建议用户不要在一个...Flink-1.10 之后,由于引入了 RocksDB 的内存托管机制,在绝大部分情况下, RocksDB 的这一部分 native 内存是可控的,不过受限于 RocksDB 的相关 cache 实现限制...taskmanager.memory.task.off-heap.size 中,使得 Flink 有更多的空间给 native 内存使用。...另一方面,由于检查点的语义,所以实际上 Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的 checkpoint,可能会影响整体的性能。
想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。...检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默认情况下,检查点存储在 JobManager 的堆(heap)内存中。...要使用Savepoints,需要按照以下步骤进行:配置状态后端: 在Flink中,状态可以保存在不同的后端存储中,例如内存、文件系统或分布式存储系统(如HDFS)。...RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。...所以运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。增量聚合的优点:高效,输出更加实时。
: file:///data/flink/checkpoints FsStateBackend将流计算数据状态存储在TaskManager的内存中,在数据流遇到检查点屏障时,再将数据快照存储在配置好的文件系统中...在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。...同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。RocksDB 默认也是配置成异步快照的模式。...不过RocksDB支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的...使用 FileSystem 和 Memory 的吞吐差异不大(都是使用堆内存管理处理中的数据),使用 RocksDB 的吞吐差距明显。
结论:是否使用 RocksDB 只会影响 Flink 任务中 keyed-state 存储的方式和地方,Flink 任务中的 operator-state 不会受到影响。...无论用户配置哪种状态后端(无论是 memory,filesystem,rocksdb),都是使用 DefaultOperatorStateBackend 来管理的,状态数据都存储在内存中,做 Checkpoint...在执行 Checkpoint 的时候,会将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中。...适用场景: a.最适合用于处理大状态、长窗口,或大键值状态的有状态处理任务。 b.RocksDBStateBackend 是目前唯一支持增量检查点的后端。 c.增量检查点非常适用于超大状态的场景。...⭐ Rocksdb 使用磁盘存储 State,所以会涉及到访问 State 磁盘序列化、反序列化,性能会收到影响,而 Filesystem 直接访问内存,单纯从访问状态的性能来说 Filesystem
未来的文章将涵盖在Apache Flink中使用RocksDB进行额外调整,以便了解有关此主题的更多信息。...Apache Flink中的RocksDB状态后端 在深入了解配置参数之前,让我们首先重新讨论在flink中如何使用RocksDB来进行状态管理。...这意味着每次READ或WRITE操作都不得不对数据进行序列化/反序列化, 使用RocksDB作为状态后端有许多优点:它不受垃圾回收的影响,与堆中的对象相比,它通常会有较低的内存开销,并且它是目前唯一支持增量检查点的选项...此外,使用RocksDB,您的状态大小仅受限于可用本地磁盘空间大小,最适合依赖大型状态操作的Flink应用程序。 如果你不熟悉RocksDB,下图说明了其基本的READ和WRITE操作。...3种配置来管理您的RocksDB内存消耗 现在我们已经使用Apache Flink建立了基于RocksDB的一些功能,让我们来看看可以帮助您更有效地管理RocksDB内存大小的配置选项。
2mb Rocksdb写入时消耗的最大内存 state.backend.rocksdb.predefined-options DEFAULT DEFAULT:所有的RocksDb配置都是默认值。...创建KeyedStateBackend 加载RocksDB JNI library相关Jar包。 申请RocksDB所需要的内存。...RocksDBIncrementalRestoreOperation 主要实现从增量快照中恢复RocksDB数据。核心函数为restore()。...主要区分为: restoreWithRescaling:从多个增量的状态后端恢复,需要进行扩缩容。在这个过程中会创建一个临时的RocksDB实例用于关key-groups。...临时RocksDB当中的数据在都会复制到实际使用的RocksDB的实例当中。 restoreWithoutRescaling:从单个远程的增量状态后端恢复,无需进行扩缩容。
RocksDBStateBackend 使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB...RocksDBStateBackend 是目前唯一支持增量检查点的后端。增量检查点非常适用于超 大状态的场景。...Flink 增量式的检查点以 RocksDB为基础, RocksDB是一个基于LSM-Tree的KV存储。新的数据保存在内存中, 称为memtable。...可以减少检查点的历史文件,避免大量小文件的产生。 15、Flink 状态过期后如何清理?...38、为什么Flink使用自主内存而不用JVM内存管理? 因为在内存中存储大量的数据 (包括缓存和高效处理)时,JVM会面临很多问题,包括如下: 1)Java 对象存储密度低。
● 远端DFS为主,本地作为Cache ● 远端DFS为主,本地作为Cache 实现方式 ● 大量侵入修改 RocksDB,支持本地存储淘汰至 DFS ● 基于RocksDB 实现新的 JNI env...Gemini 方案, 还是 Apache Flink 社区的 Forst 方案,都在 RocksDB 的基础上进行了大量改动来支持远端读写,相较于Gemini 和 Forst,Oceanus Disaggregate...同时,Flink Checkpoint 检查点文件与 RocksDB 工作文件存在相同的的文件介质和文件夹内,Flink 快照可以直接引用 RocksDB 工作文件,从而在制作检查点时无需额外的读取-写入步骤过程...该客户端组件将远端 DOP 存储空间以目录形式挂载至本地文件系统命名空间中,通过标准的 POSIX 语义提供给上层 RocksDB 使用,为 RocksDB 提供与本地存储完全兼容的访问方式。...在存算分离架构下,状态文件数据在作业运行过程中会增量异步持久化远端存储中,作业 Checkpoint 快照时只需将 Memtbale (内存表)中的少量数据 Flush 到远端存储以及上传保存对应的 Metadata
我们还为我们使用状态保存器作为我们使用的检查点和点写入谷歌云存储(GCS)。 例如确保Flink应用程序的高性能和弹性是我们的维护任务之一。这也是我们最大的。保持大型有应用程序的弹性很困难。...了解 RocksDB 内存使用情况 我们还观察到另一个与内存相关的问题,问题该非常调试,只要我们: 启动了一个有很多状态的 Flink 应用程序 等了至少一个小时 手动终止任务管理器容器之一...OOM 错误的 Flink 容纳的内存使用情况 我们确认问题发生在大量使用且已运行一个小时的应用程序中。...“不足”错误确认之前的一系列配置转储,并与 RocksDB 尝试配置比使用更多的内存: 在这个特定示例中,Flink Managed Memory 配置为使用 5.90 GB,但配置文件明确地正在使用...现在,即使在任何杀戮任务管理器到内存之后,我们也没有观察到: 没有 OOM 错误的 Flink 容纳的内存使用情况 禁用 RocksDB 块缓存不会影响性能。实际上,我们只是在缓存中没有什么区别。
在阅读本文之前,你应该阅读过的系列: 《Flink重点难点:时间、窗口和流Join》 《Flink重点难点:网络流控和反压》 《Flink重点难点:维表关联理论和Join实战》 《Flink重点难点:内存模型与内存结构...Flink 的状态数据可以存在 JVM 的堆内存或者堆外内存中,当然也可以借助第三方存储,例如 Flink 已经实现的对 RocksDB 支持。...Flink 状态分类和使用 我们在之前的课时中提到过 KeyedStream 的概念,并且介绍过 KeyBy 这个算子的使用。...状态后端种类和配置 我们在上面的内容中讲到了 Flink 的状态数据可以存在 JVM 的堆内存或者堆外内存中,当然也可以借助第三方存储。...checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。
4.下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,(栅栏对齐) 这里还展示了 RocksDB incremental Checkpoint (增量Checkpoint...)的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。...RocksDBStateBackend 还有一种存储为 RocksDBStateBackend , RocksDB 是一个 key/value 的内存存储系统,和其他的 key/value 一样,先将状态放到内存中...,如果内存快满时,则写入到磁盘中, 但需要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。...不过 RocksDB 支持增量的 Checkpoint,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。