首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

从保存点恢复时,避免在Apache Flink作业中运行初始化代码

在Apache Flink作业中,当从保存点恢复时,可以通过以下方法避免运行初始化代码:

  1. 使用状态后端:Apache Flink提供了不同的状态后端,如内存、文件系统、RocksDB等。选择合适的状态后端可以确保在从保存点恢复时,状态数据得到正确的恢复,而无需重新运行初始化代码。
  2. 使用恢复策略:Apache Flink提供了不同的恢复策略,如精确一次、至少一次等。通过选择合适的恢复策略,可以确保在从保存点恢复时,作业能够正确地继续执行,而无需重新运行初始化代码。
  3. 合理设计作业逻辑:在编写Apache Flink作业时,可以考虑将初始化代码与实际业务逻辑分离。将初始化代码放在作业的开头部分,并使用条件判断或状态来控制是否执行初始化代码。这样,在从保存点恢复时,可以根据状态来判断是否需要重新运行初始化代码。
  4. 使用有状态的函数:Apache Flink提供了有状态的函数,如RichMapFunction、RichFlatMapFunction等。通过使用这些函数,可以将初始化代码放在open()方法中,并在restore()方法中恢复状态,从而避免在从保存点恢复时重新运行初始化代码。

总结起来,避免在Apache Flink作业中运行初始化代码的关键是选择合适的状态后端和恢复策略,并合理设计作业逻辑,将初始化代码与实际业务逻辑分离。此外,使用有状态的函数也是一种有效的方式。对于Apache Flink作业的保存点恢复,可以参考腾讯云的Apache Flink产品介绍:Apache Flink产品介绍

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Apache Flink小米的发展和应用

By 大数据技术与架构 场景描述:本文由小米的王加胜同学分享,文章介绍了 Apache Flink 小米的发展, Spark Streaming 迁移到 Flink调度计算与调度数据、Mini...,才能合理地初始化资源,避免资源的重复创建;这就提高了使用门槛,容易埋下隐患;通过业务支持发现,实际生产过程,经常会遇到大并发的 Spark Streaming 作业给 Kafka 或 HBase...总之,通过对比可以看出,Flink 的 streaming 模式对于低延迟处理数据比较友好,Spark 的 mini batch 模式则于异常恢复比较友好;如果在大部分情况下作业运行稳定的话,Flink...,之前的数据无法反序列化出来的情况(这也是 Spark Streaming Checkpoint 的一个痛,在业务使用中经常出现修改了代码之后,无法 Checkpoint 恢复的问题);当然Java...,可能在恢复状态出现不兼容问题(目前 Flink仅支持 POJO 和 Avro 的格式兼容升级)。

99030
  • Flink如何实现新的流处理应用第二部分:版本化状态

    Flink 线上用户有必要担心作业升级(应用程序代码Flink本身),出现故障以及应用程序和集群维护的过程作业的表现情况。 2....保存:版本化状态 Flink ,我们引入了保存功能,可以解决上述问题以及未来更多问题。保存可以正在运行Flink 作业上获取,实质上是一个时间上定义可以外部访问的作业的快照。...要从保存恢复作业,只需运行如下即可: flink run -s pathToSavePoint jobJar 使用保存,不必从头开始重新读取事件流以重新填充 Flink 作业的状态,因为你可以随时获取一致性快照并从该检查点恢复...举个例子,利用 t1 时间保存,我们使用修改了的应用程序代码恢复作业,创建 v1 作业时间 t3 和 t4,分别从版本 v0 和 v1 获取更多的保存。...通过获取作业保存,使用新的代码保存重新启动,下游应用程序看不到任何差异。

    71620

    从一个诡异的 Bug 来看 Flink 快照和状态读取的流程

    AS rownum FROM table_name) WHERE rownum <= N [AND conditions] 作业提交后,程序运行时一切正常;但是一旦把作业暂停(做快照),然后恢复...但是,调试我们可以看到,类名、生成的代码其实有微小差别(后缀数字不一样)。...image.png 这里的后缀其实没有特别含义,是 Flink 在生成 Java 代码,为了避免类、变量冲突而维护的一个自增变量,只与生成顺序有关,与代码逻辑无关。...代码生成过程,类名和变量名都是自增生成的,这也是为什么之前我们截图里两个 comparator 的逻辑一样,类名和代码的变量名类似但不一致的原因。 4....首次访问,由于这里用了延迟初始化(Lazy Initialization)机制,会检查这个 ValueStateDescriptor 里面封装的 comparator(新作业代码生成的)与快照恢复里面记录的

    3.3K43

    吾日三省吾身-深入理解Flink Checkpoint和Savepoint

    前言 为了保证程序的容错恢复以及程序启动其状态恢复,几乎所有的 Flink 实时任务都会开启 Checkpoint 或者触发 Savepoint 进行状态保存。...Flink Savepoint 你可以把它当做在某个时间程序状态全局镜像,以后程序进行升级,或者修改并发度等情况,还能从保存的状态位继续启动恢复。...Flink Checkpoint 2.1 Flink Checkpoint 原理 Flink Checkpoint 机制保证 Flink 任务运行突然失败,能够最近 Checkpoint 进行状态恢复启动...当程序突然遇到异常,进行容错恢复,那么就会最新的 Checkpoint 进行状态恢复重启,上一部分还会进入 Flink 系统处理: 上图中表示,进行 chk-5 Checkpoint ,突然遇到程序异常...使用 DataStream 程序开发,最好为每个算子分配 uid,这样即使作业拓扑图变了,相关算子还是能够之前的状态进行恢复,默认情况下,Flink 会为每个算子分配 uid,这种情况下,当你改变了程序的某些逻辑

    89931

    专家带你吃透 Flink 架构:一个新版 Connector 的实现

    SplitEnumerator 和 SourceReader 通过 Flink 的分布式快照机制持久化状态,发生异常状态恢复。...该类保存了数据分片 id、文件路径、数据分片起始位置的文件偏移(我们这里整个文件作为一个数据分片,不再细分,因此偏移始终为 0)、文件长度、文件读取进度(恢复该位置继续数据读取)。...5.浏览器输入  http://localhost:8081 进入 Flink UI 界面,点击 'Running Jobs',可看到我们正在运行作业,点击作业进入作业详情,选择 Checkpoints.../bin/taskmanager.sh start 7.稍等一会儿,我们可在 Flink UI 里观察到作业 checkpoint 恢复的信息。...8.同时我们 taskmanager 日志里可观察到作业恢复的数据分片信息包含 checkpoint 保存的文件读取 offset 信息。

    1.5K52

    flink分析之Task的生命周期

    close-operators dispose-operators task-specific-cleanup common-cleanup 如上所示,恢复任务配置并初始化一些重要的运行时参数之后...这是setInitialState()完成的,两种情况下特别重要: 1.当任务正在从失败恢复并从最后一个成功的检查点重新启动;2.当保存(savepoint[5])恢复。...每个有状态Operator都应该覆盖这个方法,并且应该包含状态初始化逻辑,无论是第一次执行作业,还是任务失败恢复或使用保存。...简而言之,称为CheckpointBarriers的特殊元素由作业的源任务定期注入到输入数据流,并随实际数据源迁移到sink。...执行检查点,任务仍然可以接收输入数据,但数据将被缓存,并仅在检查点成功完成后才被下游处理和发送。 中断执行 在前面的部分,我们描述了一直运行到完成的任务的生命周期。

    1.6K40

    Flink1.7到1.12版本升级汇总

    Flink 1.7.0 版本社区添加了状态变化,允许我们灵活地调整长时间运行的应用程序的用户状态模式,同时保持与先前保存的兼容。通过状态变化,我们可以状态模式添加或删除列。...本地恢复 Apache Flink 1.7.0 通过扩展 Flink 的调度来完成本地恢复功能,以便在恢复考虑之前的部署位置。...恢复保存对模式迁移的新支持 使用Flink 1.7.0,我们使用AvroSerializer添加了对更改状态模式的支持。...保存兼容性 TraversableSerializer 此序列化程序(FLINK-11539)的更新,包含Scala的Flink 1.2保存点将不再与Flink 1.8兼容。...考虑这个例子:如果你有一个正在消耗topic的Kafka Consumer A,你做了一个保存,然后改变你的Kafka消费者而不是topic消费B,然后保存重新启动你的工作。

    2.6K20

    数栈技术分享:一文带你了解Flink jm、tm启动过程和资源分配

    5)初始化相关运行环境,打印软件版本、运行环境、命令行参数、classpath 等信息 6)加载flink配置文件、初始化文件系统、启动各种内部服务(RpcService、HAService、BlobServer...、命令行参数项、classpath等信息 2)注册处理各种SIGNAL的handler:记录到日志 3)注册JVM关闭保障的shutdown hook:避免JVM退出被其他shutdown hook...阻塞 4)打印YARN运行环境信息:用户名 5)运行目录中加载flink conf 3、AM启动过程 1)创建并启动各类内部服务(包括RpcService、HAService、BlobServer...退出被其他shutdown hook阻塞 加载flink配置文件、初始化文件系统、启动各种内部服务(RpcService、HAService、BlobServer、HeartbeatServices...这里以flinkX代码为例: 会被invoke()的open-operators()执行并调用到DtInputFormatSourceFunction的open方法恢复状态做一些初始化工作。

    92710

    深入理解 Flink 容错机制

    关键词:Flink 容错机制 作为分布式系统,尤其是对延迟敏感的实时计算引擎,Apache Flink 需要有强大的容错机制,以确保在出现机器故障或网络分区等不可预知的问题可以快速自动恢复并依旧能产生准确的计算结果...合适的重启策略可以减少作业不可用时间和避免人工介入处理故障的运维成本,因此对于 Flink 作业稳定性来说有着举足轻重的作用。下文就将详细解读 Flink 的错误恢复机制。...等外部服务;而故障恢复则通常要求将状态持久化到外部存储,然后故障出现时用于初始化新的进程。...TaskManager 上运行的所有 Tasks 标记为失败,从而触发 Flink 作业执行的容错机制以恢复作业。...JobMaster 保存了很多对作业执行至关重要的状态,其中 JobGraph 和用户代码会重新 HDFS 等持久化存储获取,checkpoint 信息会 zookeeper 获得,Task 的执行信息可以不恢复因为整个作业会重新调度

    2.2K31

    2021年大数据Flink(二十七):Flink 容错机制 Checkpoint

    ---- Flink 容错机制 Checkpoint State Vs Checkpoint State: 维护/存储的是某一个Operator的运行的状态/历史值,是维护在内存!...里面存放就是maxBy这个Operator的最大值) State数据默认保存在Java的堆内存/TaskManage节点的内存 State可以被记录,失败的情况下数据还可以恢复 Checkpoint...是把State数据定时持久化存储了 比如KafkaConsumer算子维护的Offset状态,当任务重新恢复的时候可以Checkpoint获取 注意: Flink的Checkpoint底层使用了...分布式情况下,不推荐使用本地文件。因为如果某个算子节点A上失败,节点B上恢复,使用本地文件B上无法读取节点 A上的数据,导致状态恢复失败。.../checkpoints ​​​​​​​代码配置 //1.MemoryStateBackend--开发不用     env.setStateBackend(new MemoryStateBackend

    98430

    数栈技术分享:一文带你了解Flink jm、tm启动过程和资源分配

    5)初始化相关运行环境,打印软件版本、运行环境、命令行参数、classpath 等信息 ​ 6)加载flink配置文件、初始化文件系统、启动各种内部服务(RpcService、HAService、...、命令行参数项、classpath等信息 2)注册处理各种SIGNAL的handler:记录到日志 3)注册JVM关闭保障的shutdown hook:避免JVM退出被其他shutdown hook阻塞...4)打印YARN运行环境信息:用户名 5)运行目录中加载flink conf ​ 3、AM启动过程 1)创建并启动各类内部服务(包括RpcService、HAService、BlobServer、...退出被其他shutdown hook阻塞 加载flink配置文件、初始化文件系统、启动各种内部服务(RpcService、HAService、BlobServer、HeartbeatServices...这里以flinkX代码为例: 会被invoke()的open-operators()执行并调用到DtInputFormatSourceFunction的open方法恢复状态做一些初始化工作。 ​

    1.6K20

    Too old resource version 引起 Flink JobManager 崩溃的问题定位

    具体现象如下: JobManager 正常运行,没有任何预兆地,突然报too old resource version错误,紧接着容器就自动退出了: 2020-10-17 14:51:36.289...该问题会触发 ResourceManager 对 JobManager 的重新初始化过程,作业也会最近的一次 Checkpoint 恢复。...但是如果没有配置 HA(High Availability,高可用)Flink 就无法正常恢复作业,造成运行关键状态的丢失,这对线上业务是无法接受的。...毕竟,这种可恢复的异常,可能会在一个长期运行作业运行周期内多次出现,平台方需要考虑到细粒度的容错,令客户的作业能够长期平稳运行。...后来我们偶然间发现,重启 API Server 服务可以复现该问题,因为新启动的 API Server 会 etcd 获取当前最新 resourceVersion,如果客户端后续用保存的旧值请求的话

    2.9K70

    Cloudera的流分析概览

    Flink的核心功能 架构 任务执行过程的两个主要组件是作业管理器和任务管理器。主节点上的作业管理器启动工作节点。工作节点上,任务管理器负责运行。任务管理器还可以同时运行多个任务。...任务的资源管理由Flink作业管理器完成。Flink群集中,Flink作业作为YARN应用程序执行。HDFS用于存储恢复和日志数据,而ZooKeeper用于对作业进行高可用性协调。 ?...您可以使用Flink将应用程序的状态本地存储状态后端,以确保访问已处理数据降低延迟。您还可以创建检查点和保存,以持久性存储上对流式应用程序进行容错备份。 ?...检查点和保存 可以创建检查点和保存,以使Flink应用程序整个管道容错。Flink包含一个容错机制,该机制可以连续创建数据流的快照。快照不仅包括数据流,还包括附加的状态。...如果发生故障,则选择最新快照,然后系统该检查点恢复。这保证了可以始终保持计算结果的一致性。当检查点由Flink创建和管理保存由用户控制。保存可以描述为已执行过程的备份。 ?

    1.2K20

    专家带你吃透 Flink 架构:一个 新版 Connector 的实现

    SplitEnumerator 和 SourceReader 通过 Flink 的分布式快照机制持久化状态,发生异常状态恢复。...该类保存了数据分片 id、文件路径、数据分片起始位置的文件偏移(我们这里整个文件作为一个数据分片,不再细分,因此偏移始终为 0)、文件长度、文件读取进度(恢复该位置继续数据读取)。...浏览器输入 http://localhost:8081 进入 Flink UI 界面,点击 'Running Jobs',可看到我们正在运行作业,点击作业进入作业详情,选择 Checkpoints...稍等一会儿,我们可在 Flink UI 里观察到作业 checkpoint 恢复的信息。 [e82f5t7yd1.png] 8....同时我们 taskmanager 日志里可观察到作业恢复的数据分片信息包含 checkpoint 保存的文件读取 offset 信息。

    1.6K50

    Flink1.3 指南四 命令行接口 转

    命令行接口启动脚本是 $FLINK_HOME/bin目录下的flink脚本, 默认情况下会连接运行Flink master(JobManager),JobManager的启动脚本与CLI同一安装目录下...如果operator调用取消操作后没有停止,Flink将定期开启中断线程来取消作业直到作业停止。 调用停止Job是一种停止正在运行的流作业的更加优雅的方法。...如果保存触发成功,该作业将被取消 2.3 恢复保存 ./bin/flink run -s ......这个run命令提交Job带有一个保存标记,这使得程序可以保存点中恢复状态。保存路径是通过保存触发命令得到的。 默认情况下,我们尝试将所有保存状态与正在提交的作业相匹配。...如果当保存触发你程序删除了作为程序一部分的operator,但是仍然要使用保存,则需要允许这一(You need to allow this if you removed an operator

    1.5K20

    Flink 内部原理之分布式运行环境

    任务链与算子链 分布式运行Flink将算子(operator) SubTask 连接成 Task。每个 Task 都只由一个线程执行。...客户端作为触发执行的Java/Scala程序的一部分运行,或者命令行运行./bin/flink命令来运行…. 3....除了定义保存状态的数据结构之外,后端状态还实现了获取键/值状态的时间快照逻辑并将该快照存储为检查点的一部分。 ? 5. 保存 用Data Stream API编写的程序可以保存恢复执行。...保存允许更新你的程序和你的Flink集群,而不会丢失任何状态。 保存是手动触发的检查点,它会捕获程序的快照并将其写入后端状态。他们依赖于常规检查点机制。...保存与这些定期检查点类似,只不过它们是由用户触发的,不会在新检查点完成自动失效。

    1.6K40

    《一文读懂腾讯云Flink CDC 原理、实践和优化》

    ,以避免作业崩溃恢复后状态不一致;同样地,如果正在向下游算子发送数据并更新 offset 信息,也不允许快照的进行。...这里也解释了作业刚启动,如果数据库较大(同步时间较久),Flink 刚开始的 Checkpoint 永远失败(超时)的原因:只有当 Flink 完整同步了全量数据后,才可以进行增量数据的处理,以及...周期,避免作业因 Checkpoint 失败而一直重启。...异常数据造成作业持续重启 默认情况下,如果遇到异常的数据(例如消费的 Kafka topic 无意间混入了其他数据),Flink 会立刻崩溃重启,然后从上个快照(Checkpoint)重新消费。...可以从中看到,Flink 1.13 主要着力于支持更多的类型(FLINK-18758[https://issues.apache.org/jira/browse/FLINK-18758]),以及允许

    2.8K31

    专家带你吃透 Flink 架构:一个 新版 Connector 的实现

    SplitEnumerator 和 SourceReader 通过 Flink 的分布式快照机制持久化状态,发生异常状态恢复。...该类保存了数据分片 id、文件路径、数据分片起始位置的文件偏移(我们这里整个文件作为一个数据分片,不再细分,因此偏移始终为 0)、文件长度、文件读取进度(恢复该位置继续数据读取)。...浏览器输入 http://localhost:8081 进入 Flink UI 界面,点击 'Running Jobs',可看到我们正在运行作业,点击作业进入作业详情,选择 Checkpoints...稍等一会儿,我们可在 Flink UI 里观察到作业 checkpoint 恢复的信息。 [e82f5t7yd1.png] 8....同时我们 taskmanager 日志里可观察到作业恢复的数据分片信息包含 checkpoint 保存的文件读取 offset 信息。

    99650

    State Processor API:如何读取,写入和修改 Flink 应用程序的状态

    过去无论是在生产中使用,还是调研 Apache Flink,总会遇到一个问题:如何访问和更新 Flink 保存(savepoint)中保存的 state?...Apache Flink 1.9 引入了状态处理器(State Processor)API,它是基于 DataSet API 的强大扩展,允许读取,写入和修改 Flink保存和检查点(checkpoint...每一个版本Flink 社区都添加了越来越多与状态相关的特性,以提高检查点执行和恢复的速度、改进应用程序的维护和管理。 然而,Flink 用户经常会提出能够“外部”访问应用程序的状态的需求。...Flink 的可查询状态(queryable state)功能只支持基于键的查找(查询),且不保证返回值的一致性(应用程序发生故障恢复前后,返回值可能不同),并且可查询状态只支持读取并不支持修改和写入...MyApp 的保存或检查点均由所有状态的数据组成,这些数据的组织方式可以恢复每个任务的状态。使用批处理作业处理保存(或检查点)的数据,我们脑海中需要将每个任务状态的数据映射到数据集或表

    1.9K20
    领券