如果Flink应用程序崩溃和完成重启之间的时间较长,那么Kafka的事务超时将导致数据丢失(Kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置事务超时。...其次,在Flink应用程序失败的情况下,读者将阻止此应用程序编写的主题,直到应用程序重新启动或配置的事务超时时间过去为止。此注释仅适用于有多个代理/应用程序写入同一Kafka主题的情况。...如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用来自Kafka的记录。 因此,绘制检查点的间隔定义了程序在发生故障时最多可以返回多少。...检查点常用参数 enableCheckpointing 启用流式传输作业的检查点。 将定期快照流式数据流的分布式状态。 如果发生故障,流数据流将从最新完成的检查点重新启动。...该作业在给定的时间间隔内定期绘制检查点。 状态将存储在配置的状态后端。 此刻未正确支持检查点迭代流数据流。 如果“force”参数设置为true,则系统仍将执行作业。
在实际生产中相信已经有很多小伙伴尝试过了,我在这里将一些个人遇到的、搜索到的、官方博客中总结的以及在Flink的邮件组中的看到过的一些常见问题进行了总结。供大家参考。...MySQL CDC源等待超时 在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。...超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。因此,如果数据库表很大,则建议添加以下Flink配置,以避免由于超时检查点而导致故障转移: ?...如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。 解决办法:创建一个新的MySQL用户并授予其必要的权限。...作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图: ?
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator(检查点协调器),CheckpointCoordinator全权负责本应用的快照制作...,仅用于在故障中恢复作业,可以启用外部持久化检查点,同时指定保留策略: ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作业取消时保留检查点...,注意,在这种情况下,您必须在取消后手动清理检查点状态 ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业在被cancel时,删除检查点,...检查点仅在作业失败时可用 //设置checkpoint超时时间 env.getCheckpointConfig.setCheckpointTimeout(60000) //Checkpointing的超时时间...如果checkpoint未启动,就会采用 no restart 策略,如果启动了checkpoint机制,但是未指定重启策略的话,就会采用 fixed-delay 策略,重试 Integer.MAX_VALUE
,如何使用检查点 只需要指定检查点路径重启任务即可 bin/flink run -s :checkpointMetaDataPath [:runArgs] checkpointMetaDataPath...: 这个是检查点元数据路径,并不简单是所配置的检查点的路径 参考:https://blog.csdn.net/lt793843439/article/details/89641904 三、总结下flink...作业异常中断的操作流程 1、找出作业对应的jobID 2、进入hdfs对应目录,找到目录下面最新的检查点目录 3、通过指定检查点目录的方式重新启动作业 4、观察作业运行情况,如果出现内存溢出异常断开...,加大内存重新启动。...待作业运行稳定,查看作业最初异常中断的原因,记录下来并总结思考如何解决和避免。 四、怎么屏蔽flink checkpoint 打印的info 日志?
该异常在 Flink AM 向 YARN NM 申请启动 token 已超时的 Container 时抛出,通常原因是 Flink AM 从 YARN RM 收到这个 Container 很久之后(超过了...MySQL CDC源等待超时 在扫描表期间,由于没有可恢复的位置,因此无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。...超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。...作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图: 原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响...作业频繁重启的成因非常多,例如异常数据造成的作业崩溃,可以在 TaskManager 的日志中找到报错。数据源或者数据目的等上下游系统超时也会造成作业无法启动而一直在重启。
Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复。因此,它保证了仅一次的语义。...如果TaskManager失败,则再次启动时,它可能具有不同的server id。但这不应该经常发生(作业异常不会重新启动TaskManager),也不会对MySQL服务器造成太大影响。...5、扫描数据库表期间无法执行检查点 在扫描表期间,由于没有可恢复的位置,因此我们无法执行checkpoints。为了不执行检查点,MySQL CDC源将保持检查点等待超时。...超时检查点将被识别为失败的检查点,默认情况下,这将触发Flink作业的故障转移。...为大型数据库创建初始一致的快照时,在读取表时,您建立的连接可能会超时。
Flink支持不同的重启策略,重启策略控制在作业失败后如何重启。可以使用默认的重启策略启动集群,这个默认策略在作业没有特别指定重启策略时使用。...如果未启用检查点,那么将使用不重启策略。如果启用检查点且重启策略尚未配置,则固定延迟重启策略与Integer.MAX_VALUE一起使用进行尝试重启。请参阅下面可用的重启策略列表以了解支持哪些值。...以下示例显示了如何为作业设置固定延迟重启策略。如果发生故障,系统将尝试每10s重新启动一次作业,最多重启3次。...2.1 固定延迟重启策略 固定延迟重启策略尝试一定次数来重新启动作业。如果超过最大尝试次数,那么作业最终将失败。在两次连续的尝试重启之间,重启策略会等待一段固定的时间(译者注:连续重启时间间隔)。...在声明作业失败之前,Flink重试执行的次数 1或者如果启用检查点,则为Integer.MAX_VALUE restart-strategy.fixed-delay.delay 延迟重试意味着在执行失败后
自发布以来,该工具为开发人员和平台团队带来了显着的生产力提升。 Flink 作业故障排除有哪些挑战? 大量分散的日志和指标,其中只有少数很重要。...对于故障排除,工程师通常: 从 YARN UI 滚动 查看一系列JM/TM 日志 检查数十个作业/服务器指标仪表板 搜索和验证作业配置 单击 Flink Web UI 作业 DAG 以查找检查点对齐、数据倾斜和背压等详细信息...例如,检查点超时可能意味着不正确的超时配置,但也可能是背压、s3 上传缓慢、GC 错误或数据倾斜的结果; 丢失 TaskManager 日志可能意味着坏节点,但通常是堆或 RocksDB 状态后端 OOM...一分钟内没有背压显示为绿色方块,否则为红色方块。 每个算子60个方格,代表过去1小时的背压情况。 这样可以轻松确定背压发生的频率以及哪个operator最早启动。...本文为从大数据到人工智能博主「xiaozhch5」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
Flink实现了每个核每秒150万个元素的连续吞吐量。这样集群的总吞吐量达到每秒1.82亿个元素。测试得到的Flink延迟为零,因为作业不涉及网络,也不涉及微批处理。...我们在30台机器的集群中运行此作业,其系统配置与以前相同。Flink实现了每核每秒大约720,000个事件的吞吐量,启动检查点后降至690,000。...通过指定缓冲区超时时间,例如10毫秒,我们可以告诉Flink在缓冲区满了时或者到达10毫秒时发送缓冲区数据。较低的缓冲区超时时间通常意味着较低的延迟,可能以吞吐量为代价。...在上面的实验中,缓冲区超时时间设置为50毫秒,这解释了为什么99%的记录延迟在50毫秒以下。 下面说明了延迟如何影响Flink的吞吐量。...下图显示了不同缓冲区超时时间下的Flink吞吐量。该实验再次使用流记录分组作业。 ? 如果指定缓冲区超时时间为零,流经算子的记录不会缓冲而是立即转发到下一个算子。
Flink支持不同的重启策略,可以控制在发生故障时如何重启新启动作业。 默认重启策略是通过Flink的配置文件设置的flink-conf.yaml。...如果未启用检查点,则使用“无重启”策略。如果激活了检查点并且尚未配置重启策略,则固定延迟策略将用于 Integer.MAX_VALUE重启尝试。...1.固定延迟重启策略 固定延迟重启策略是尝试给定次数重新启动作业。如果超过最大尝试次数,则作业失败。在两次连续重启尝试之间,会有一个固定的延迟等待时间。...在flink-conf.yaml文件配置 # 设置重启策略为failure-rate restart-strategy: failure-rate # 失败作业之前的给定时间间隔内的最大重启次数,默认...),Time.of(10, TimeUnit.SECONDS))); 3.无重启策略 作业直接失败,不尝试重启。
示例Flink流式处理作业和硬件 ? 示例Flink Streaming作业拓扑 对于此示例,我将部署一个典型的Flink流式作业,该作业使用Flink的Kafka使用者从Kafka主题读取数据。...由于总是有新数据,我将窗口配置为一个滑动窗口,滑动时间为1分钟。 这意味着我将获得每分钟更新过去5分钟的聚合。 流式传输作业为每个userId创建一个聚合。...假设硬件设置 运行该作业的机器有五台,每台机器都运行Flink TaskManager(Flink的工作节点)。...您还可以启用容错检查点。 如果计算机或其他任何其他设备出现故障,您需要恢复窗口内容并继续处理。 检查点设置为每分钟一个检查点的间隔,每个检查点将作业的整个状态复制到网络附加文件系统中。...与窗口运算符类似,检查点具有突发模式,每分钟一次,它会尝试将其数据全速发送到外部存储。 检查点导致对RocksDB的额外状态访问(在此示例中位于网络连接磁盘上)。
检查点允许 Flink 在流中恢复状态和位置,为应用程序提供与无故障执行相同的语义。 关于 Flink 流式容错机制背后的技术请参阅流式容错的详细文档。 1....(2) 检查点超时:如果在规定时间之前没有完成检查点,正在进行的检查点就会被终止。 (3) 检查点之间的最小时间:为了确保流式应用程序在检查点之间有一定的进展,可以定义检查点之间的时间间隔。...例如,如果此值设置为5000,不论检查点持续时间和检查点间隔是多少,下一个检查点将在上一个检查点完成之后的5秒内启动。...外部检查点将其元数据写入持久性存储,作业失败时也不会自动清理。这样,如果你的作业失败,你将会有一个检查点用来恢复。有关外部检查点的部署说明中有更多详细信息。...有关可用状态终端以及作业范围和群集范围内配置选项的的详细信息,请参阅状态终端。 5. 迭代作业中的状态检查点 目前 Flink 只为无迭代作业提供处理保证。在迭代作业上启用检查点会导致异常。
实际上 Oceanus 平台已经内置了 Flink 相关的 JAR 包,用户在打包时不用将这些 JAR 打进去,只需要在 POM 里面 将scope设置为provided 即可,例如: 尝试增加作业的算子并行度(CU)数和优化内存占用,避免内存泄露。...总结 本文首先对出现的最基础的、用户可以自己解决的常见报错做了一些总结,这些错误常常出现在作业启动之前,所以在作业正式启动之前,用户需要自己检查好这些类型的错误,保证作业能够顺利的启动。...之后介绍了下作业启动之后的一些基础运维手段,包括实时监控和告警通知,方便用户及时了解作业的具体运行情况。最后介绍了在作业失败之后通过日志关键字定位问题的方法,具体可以查看 日志诊断指南[6]。...功能(Flink >= 1.13) JAR 包过大 POM 里面将 scope 设置为 provided 找不到主类 1、检查 JAR 包主类名是否填写错误。
Cloudera流分析的主要功能 SQL流生成器 SQL Stream Builder是一个作业管理界面,用于在流上编写和执行Streaming SQL,以及为结果创建持久的数据API。...Flink的核心功能 架构 任务执行过程的两个主要组件是作业管理器和任务管理器。主节点上的作业管理器启动工作节点。在工作节点上,任务管理器负责运行。任务管理器还可以同时运行多个任务。...任务的资源管理由Flink中的作业管理器完成。在Flink群集中,Flink作业作为YARN应用程序执行。HDFS用于存储恢复和日志数据,而ZooKeeper用于对作业进行高可用性协调。 ?...检查点和保存点 可以创建检查点和保存点,以使Flink应用程序在整个管道中容错。Flink包含一个容错机制,该机制可以连续创建数据流的快照。快照不仅包括数据流,还包括附加的状态。...如果发生故障,则选择最新快照,然后系统从该检查点恢复。这保证了可以始终保持计算结果的一致性。当检查点由Flink创建和管理时,保存点由用户控制。保存点可以描述为已执行过程的备份。 ?
Kafka消息大小默认配置太小,导致数据未处理 业务背景: 正常的Flink任务消费Topic数据,但是Topic中的数据为XML以及JSON,单条数据较大。...,这种原因一般情况下failover后作业能正常恢复,如果出现的不频繁可以不用关注;2、failover的节点对应TM的内存设置太小,GC严重导致心跳超时,建议调大对应节点的内存值。...Skipping current checkpoint 在当前检查点还未做完时,收到了更新的检查点的barrier,表示当前检查点不再需要而被取消掉,一般不需要特殊处理。...(2) Checkpoint expired before completing 首先应检查CheckpointConfig.setCheckpointTimeout()方法设定的检查点超时...(5)时钟不同步导致无法启动 启动Flink任务的时候报错 Caused by: java.lang.RuntimeException: Couldn't deploy Yarn cluster。
当检查点最终经常花费比基本间隔更长的时间(例如,因为状态增长大于计划,或者存储检查点的存储暂时很慢),系统会不断地获取检查点(一旦完成,新的检查点就会立即启动) ....压缩 Flink 为所有检查点和保存点提供可选的压缩(默认:关闭)。 目前,压缩始终使用 snappy 压缩算法(版本 1.1.4),但我们计划在未来支持自定义压缩算法。...例如,可以保留 3 个最新检查点的历史记录作为主副本,并且只保留最新检查点的任务本地状态。 对于恢复,如果有匹配的辅助副本可用,Flink 将始终首先尝试从任务本地状态恢复。...如果在从副本恢复过程中出现任何问题,Flink 会透明地重试从主副本恢复任务。仅当主副本和(可选)辅助副本失败时,恢复才会失败。在这种情况下,根据配置,Flink 仍可能回退到旧的检查点。...在这种情况下,Flink 会首先尝试在本地恢复本地部分,非本地状态从主副本恢复。 主状态必须始终是完整的,并且是任务本地状态的超集。 任务本地状态可以具有与主状态不同的格式,它们不需要字节相同。
领取专属 10元无门槛券
手把手带您无忧上云