首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Managing Large State in Apache Flink®: An Intro to Incremental Checkpointing

Managing Large State in Apache Flink®: An Intro to Incremental Checkpointing

作者头像
mikezou
发布于 2018-08-10 09:53:24
发布于 2018-08-10 09:53:24
8250
举报

Apache Flink was purpose-built forstatefulstream processing. Let’s quickly review: what is state in a stream processing application? I defined state and stateful stream processing in aprevious blog post, and in case you need a refresher,state is defined as memory in an application’s operators that stores information about previously-seen events that you can use to influence the processing of future events.

State is a fundamental, enabling concept in stream processing required for a majority of interesting use cases. Some examples highlighted in theFlink documentation:

  • When an application searches for certain event patterns, the state stores the sequence of events encountered so far.
  • When aggregating events per minute, the state holds the pending aggregates.
  • When training a machine learning model over a stream of data points, the state holds the current version of the model parameters.

However, stateful stream processing is only useful in production environments if the state is fault tolerant. “Fault tolerance” means that even if there’s a software or machine failure, the computed end-result is accurate, with no data loss or double-counting of events.

Flink’s fault tolerance has always been a powerful and popular attribute of the framework, minimizing the impact of software or machine failure on your business and making it possible to guarantee exactly-once results from a Flink application.

Core to this ischeckpointing, which is the mechanism Flink uses to make application state fault tolerant. A checkpoint in Flink is a global, asynchronous snapshot of application state and position in the input stream that’s taken on a regular interval and sent to durable storage (usually a distributed file system). In the event of a failure, Flink restarts an application using the most recently-completed checkpoint as a starting point.

Some Apache Flink users run applications with gigabytes or even terabytes of application state. These users have reported that with such large state, creating a checkpoint was often a slow and resource intensive operation, which is why in Flink 1.3 we introduced a new feature called ‘incremental checkpointing.’

Before incremental checkpointing, every single Flink checkpoint consisted of the full state of an application. We created the incremental checkpointing feature after we observed that writing the full state for every checkpoint was often unnecessary, as the state changes from one checkpoint to the next were rarely that large. Incremental checkpointing instead maintains the differences (or ‘delta’) between each checkpoint and stores only the differences between the last completed checkpoint and the current application state.

Incremental checkpoints can provide a significant performance improvement for jobs with a very large state. Implementation of the feature by a production user with terabytes of state showsa drop in checkpoint timefrom more than 3 minutes per checkpoint down to 30 seconds per checkpoint after implementing incremental checkpoints. This improvement is a result of not needing to transfer the full state to durable storage on each checkpoint.

How to Start

Currently, you can only use incremental checkpointing with aRocksDB state backend, and Flink uses RocksDB’s internal backup mechanism to consolidate checkpoint data over time. As a result, the incremental checkpoint history in Flink does not grow indefinitely, and Flink eventually consumes and prunes old checkpoints automatically.

To enable incremental checkpointing in your application, I recommend you read thethe Apache Flink documentation on checkpointingfor full details, but in summary, you enable checkpointing as normal and also enable incremental checkpointing in the constructor by setting the second parameter totrue.

Java Example

12

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new RocksDBStateBackend(filebackend, true));

Scala Example

12

val env = StreamExecutionEnvironment.getExecutionEnvironment()env.setStateBackend(new RocksDBStateBackend(filebackend, true))

By default, Flink retains 1 completed checkpoint, so if you need a higher number, you can configure it with the following flagstate.checkpoints.num-retained

How it Works

Flink’s incremental checkpointing usesRocksDB checkpointsas a foundation. RocksDB is a key-value store based on ‘log-structured-merge’ (LSM) trees that collects all changes in a mutable (changeable) in-memory buffer called a ‘memtable’. Any updates to the same key in the memtable replace previous values, and once the memtable is full, RocksDB writes it to disk with all entries sorted by their key and with light compression applied. Once RocksDB writes the memtable to disk it is immutable (unchangeable) and is now called a ‘sorted-string-table’ (sstable).

A ‘compaction’ background task merges sstables to consolidate potential duplicates for each key, and over time RocksDB deletes the original sstables, with the merged sstable containing all information from across all the other sstables.

On top of this, Flink tracks which sstable files RocksDB has created and deleted since the previous checkpoint, and as the sstables are immutable, Flink uses this to figure out the state changes. To do this, Flink triggers a flush in RocksDB, forcing all memtables into sstables on disk, and hard-linked in a local temporary directory. This process is synchronous to the processing pipeline, and Flink performs all further steps asynchronously and does not block processing.

Then Flink copies all new sstables to stable storage (e.g., HDFS, S3) to reference in the new checkpoint. Flink doesn’t copy all sstables that already existed in the previous checkpoint to stable storage but re-references them. Any new checkpoints will no longer reference deleted files because deleted sstables in RocksDB are always the result of compaction, and it eventually replaces old tables with an sstable that is the result of a merge. This how in Flink’s incremental checkpoints can prune the checkpoint history.

For tracking changes between checkpoints, the uploading of consolidated tables is redundant work. Flink performs the process incrementally, and typically adds only a small overhead, so we consider this worthwhile because it allows Flink to keep a shorter history of checkpoints to consider in a recovery.

(Click on the image below to open a full-size version in a new tab)

Take an example with a subtask of one operator that has a keyed state, and the number of retained checkpoints set at2. The columns in the figure above show the state of the local RocksDB instance for each checkpoint, the files it references, and the counts in the shared state registry after the checkpoint completes.

For checkpoint ‘CP 1’, the local RocksDB directory contains two sstable files, and it considers these new and uploads them to stable storage using directory names that match the checkpoint name. When the checkpoint completes, Flink creates the two entries in the shared state registry and sets their counts to ‘1’. The key in the shared state registry is a composite of an operator, subtask, and the original sstable file name. The registry also keeps a mapping from the key to the file path in stable storage.

For checkpoint ‘CP 2’, RocksDB has created two new sstable files, and the two older ones still exist. For checkpoint ‘CP 2’, Flink adds the two new files to stable storage and can reference the previous two files. When the checkpoint completes, Flink increases the counts for all referenced files by 1.

For checkpoint ‘CP 3’, RocksDB’s compaction has mergedsstable-(1),sstable-(2), andsstable-(3)intosstable-(1,2,3)and deleted the original files. This merged file contains the same information as the source files, with all duplicate entries eliminated. In addition to this merged file,sstable-(4)still exists and there is now a newsstable-(5)file. Flink adds the newsstable-(1,2,3)andsstable-(5)files to stable storage,sstable-(4)is re-referenced from checkpoint ‘CP 2’ and increases the counts for referenced files by 1. The older ‘CP 1’ checkpoint is now deleted as the number of retained checkpoints (2) has been reached. As part of this deletion, Flink decreases the counts for all files referenced ‘CP 1’, (sstable-(1)andsstable-(2)), by 1.

For checkpoint ‘CP-4’, RocksDB has mergedsstable-(4),sstable-(5), and a newsstable-(6)intosstable-(4,5,6). Flink adds this new table to stable storage and references it together withsstable-(1,2,3), it increases the counts forsstable-(1,2,3)andsstable-(4,5,6)by 1 and then deletes ‘CP-2’ as the number of retained checkpoints has been reached. As the counts forsstable-(1),sstable-(2), andsstable-(3)have now dropped to 0, and Flink deletes them from stable storage.

Race Conditions and Concurrent Checkpoints

As Flink can execute multiple checkpoints in parallel, sometimes new checkpoints start before confirming previous checkpoints as completed. Because of this, Flink must consider which of the previous checkpoints to use as a basis for a new incremental checkpoint. Flink only references state from a checkpoint confirmed by the checkpoint coordinator so that it doesn’t unintentionally reference a deleted shared file.

Restoring Checkpoints and Performance Considerations

If you enable incremental checkpointing, there are no further configuration steps needed to recover your state in case of failure. If a failure occurs, Flink’sJobManagertells all tasks to restore from the last completed checkpoint, be it a full or incremental checkpoint. EachTaskManagerthen downloads their share of the state from the checkpoint on the distributed file system.

Though the feature can lead to a substantial improvement in checkpoint time for users with a large state, there are trade-offs to consider with incremental checkpointing. Overall, the process reduces the checkpointing time during normal operations but can lead to a longer recovery time depending on the size of your state. If the cluster failure is particularly severe and the FlinkTaskManagers have to read from multiple checkpoints, recovery can be a slower operation than when using non-incremental checkpointing. You’ll need to plan for larger distributed storage to maintain the checkpoints and the network overhead to read from it.

There are some strategies for improving the convenience/performance trade-off, and I recommend you readthe Flink documentationfor more details.

Interested in learning more about checkpoints in Flink? Check out Stefan Richter’sFlink Forward Berlin 2017 talk “A Look at Flink’s Internal Data Structures and Algorithms for Efficient Checkpointing”

And you might also enjoy our CTO Stephan Ewen’sFlink Forward San Francisco 2017 talk “Experiences Running Flink at Very Large Scale”

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink CheckPoint奇巧 | 原理和在生产中的应用
场景描述:Flink本身为了保证其高可用的特性,以及保证作用的Exactly Once的快速恢复,进而提供了一套强大的Checkpoint机制。这个机制在原理是什么?有哪些需要注意的呢?
王知无-import_bigdata
2019/09/23
1.8K0
Flink CheckPoint奇巧 | 原理和在生产中的应用
Flink 管理大型状态之增量 Checkpoint
Apache Flink 是一个有状态的流处理框架。什么是流处理应用程序的状态呢?你可以理解状态为应用程序算子中的内存。状态在流计算很多复杂场景中非常重要,比如:
smartsi
2021/07/27
3.5K0
聊聊flink的checkpoint配置
序 本文主要研究下flink的checkpoint配置 sl21-1518991391479.jpg 实例 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ​ // start a checkpoint every 1000 ms env.enableCheckpointing(1000); ​ // advanced options: ​ // set mode to exac
code4it
2018/12/09
5.4K0
聊聊flink的checkpoint配置
Apache Flink 管理大型状态之增量 Checkpoint 详解
作者 | Stefan Ricther & Chris Ward 翻译 | 邱从贤(山智)
小晨说数据
2019/09/16
5.7K0
Apache Flink 管理大型状态之增量 Checkpoint 详解
Flink RocksDB State Backend:when and how
流处理应用程序通常是有状态的,“记住”已处理事件的信息,并使用它来影响进一步的事件处理。在Flink中,记忆的信息(即状态)被本地存储在配置的状态后端中。为了防止发生故障时丢失数据,状态后端会定期将其内容快照保存到预先配置的持久性存储中。该RocksDB[1]状态后端(即RocksDBStateBackend)是Flink中的三个内置状态后端之一。这篇博客文章将指导您了解使用RocksDB管理应用程序状态的好处,解释何时以及如何使用它,以及清除一些常见的误解。话虽如此,这不是一篇说明RocksDB如何深入工作或如何进行高级故障排除和性能调整的博客文章;如果您需要任何有关这些主题的帮助,可以联系Flink用户邮件列表[2]。
山行AI
2021/04/16
3.4K0
Flink RocksDB State Backend:when and how
【Flink】【更新中】状态后端和checkpoint
有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能:
zeekling
2023/09/06
6470
【Flink】【更新中】状态后端和checkpoint
有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能:
zeekling
2023/10/17
7630
【Flink】【更新中】状态后端和checkpoint
Flink学习笔记(5) -- Flink 状态(State)管理与恢复
  我们前面写的word count的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。
挽风
2021/04/13
3.6K0
Flink学习笔记(5) -- Flink 状态(State)管理与恢复
如何在Apache Flink中管理RocksDB内存大小
原文:https://www.ververica.com/blog/manage-rocksdb-memory-size-apache-flink 翻译:zhangjun,英语水平不太好,如有问题,请大家不吝赐教
大数据技术与应用实战
2020/09/15
2.2K0
如何在Apache Flink中管理RocksDB内存大小
【推荐系统算法实战】Flink 架构及其工作原理
分布式系统需要解决:分配和管理在集群的计算资源、处理配合、持久和可访问的数据存储、失败恢复。Fink专注分布式流处理。
一个会写诗的程序员
2019/12/30
1.8K0
【推荐系统算法实战】Flink 架构及其工作原理
Flink在大规模状态数据集下的checkpoint调优
众所周知,Flink内部为了实现它的高可用性,实现了一套强大的checkpoint机制,还能保证作用的Exactly Once的快速恢复。对此,围绕checkpoint过程本身做了很多的工作。在官方文档中,也为用户解释了checkpoint的部分原理以及checkpoint在实际生产中(尤其是大规模状态集下)的checkpoint调优参数。笔者结合官方文档,给大家做个总结,也算是对Flink checkpoint机理的一个学习。
王知无-import_bigdata
2019/08/01
4.4K3
Flink在大规模状态数据集下的checkpoint调优
Flink 极简教程: 架构及原理 Apache Flink® — Stateful Computations over Data Streams
Apache Flink 是一个分布式流计算引擎,用于在无边界和有边界数据流上进行有状态的计算。
一个会写诗的程序员
2022/01/04
3.5K0
Flink 极简教程: 架构及原理 Apache Flink® — Stateful Computations over Data Streams
理解CheckPoint及其在Tensorflow & Keras & Pytorch中的使用
Checkpointing Tutorial for TensorFlow, Keras, and PyTorch
狼啸风云
2020/02/14
5.3K0
第三篇:Centos7 Flink 1.12.2 on yarn 部署
http://192.168.123.156:8088/cluster/scheduler
文末丶
2021/12/27
9370
Flink状态后端和CheckPoint 调优
RocksDB 是嵌入式的 Key-Value 数据库,在 Flink 中被用作 RocksDBStateBackend 的底层存储。如下图所示,RocksDB 持久化的 SST文件在本地文件系统上通过多个层级进行组织,不同层级之间会通过异步Compaction 合并重复、过期和已删除的数据。在 RocksDB 的写入过程中,数据经过序列化后写入到WriteBuffer,WriteBuffer 写满后转换为 Immutable Memtable 结构,再通过 RocksDB 的flush 线程从内存 flush 到磁盘上;读取过程中,会先尝试从 WriteBuffer 和 Immutable Memtable 中读取数据,如果没有找到,则会查询 Block Cache,如果内存中都没有的话,则会按层级查找底层的 SST 文件,并将返回的结果所在的 Data Block 加载到 BlockCache,返回给上层应用。
zeekling
2023/01/02
1.6K0
Flink状态后端和CheckPoint 调优
聊聊flink的CheckpointScheduler
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
code4it
2018/12/07
1.1K0
聊聊flink的CheckpointScheduler
聊聊flink的MemoryStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBackend.java
code4it
2018/12/10
8700
聊聊flink的MemoryStateBackend
聊聊flink的CheckpointScheduler
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
code4it
2018/12/25
1.5K0
Flink1.4 检查点启用与配置
Flink 中的每个函数和操作符都可以是有状态的(请参阅使用状态了解详细信息)。有状态函数在处理单个元素/事件时存储数据。
smartsi
2019/08/08
2.1K0
Flink 1.13 StateBackend 与 CheckpointStorage 拆分
Apache Flink 的持久化对许多用户来说都是一个谜。用户最常见反复提问的问题就是不理解 State、StateBackend 以及快照之间的关系。通过学习可以解答我们的一些困惑,但是这个问题如此常见,我们认为 Flink 的用户 API 应该设计的更友好一些。在过去几年中,我们经常会听到如下误解:
smartsi
2022/02/22
2.9K0
Flink 1.13 StateBackend 与 CheckpointStorage 拆分
相关推荐
Flink CheckPoint奇巧 | 原理和在生产中的应用
更多 >
LV.0
腾讯云研发工程师
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档