背景
本文参考Flink1.10官方多篇文章相关知识收集、翻译、整合和内化而写成的关于Flink内存模型详解的文章,其中Job Manager、Task Manager和Client 分别是什么,各自之间的运行关系怎样,任务运行过程中所使用任务槽和资源情况的内存模型构成详解,内存设置需要配置哪些参数,参数功能描述等。暂时不熟悉Flink相关概念的童鞋自觉查阅笔者以往分享关于Flink术语基本概念的文章链接:Flink优化器与源码解析系列--Flink相关基本概念。
内存模型
先从一个简单Flink程序执行流程讲起,对在作业执行过程中涉及到Job Managers, Task Managers和Clients功能说明以及使用资源情况以及Flink内存模型介绍。
Clients客户端不属于运行时环境和程序执行(the runtime and program execution)的一部分,而是被用来准备和发送的数据流到JobManager。之后,客户端可以断开连接或保持连接状态以接收进度报告。客户端既可以作为触发执行的Java / Scala程序的一部分运行,也可以在命令行进程中运行./bin/flink run ...。
每个worker(TaskManager)是一个JVM进程,并且可以在单独的线程中执行一个或多个子任务subtasks。一个worker用任务槽task slots(至少一个)来管理接受任务的。每个任务槽代表TaskManager的资源的固定子集。例如,具有三个插槽的TaskManager,那么每个插槽slot享有1/3的托管内存Managed Memory。分配资源意味着子任务不会与其他作业的子任务subtasks竞争托管内存,而是具有一定数量的保留托管内存。请注意,此处没有发生CPU隔离。当前插槽slot仅将任务的托管内存分开。
通过调整任务槽task slots的数量,用户可以定义子任务如何相互隔离。每个TaskManager具有一个插槽slot,这意味着每个任务组都在单独的JVM中运行(例如,可以在单独的容器中启动)。具有多个插槽意味着更多子任务共享同一JVM。同一JVM中的任务共享TCP连接(通过多路复用)和心跳消息。他们还可以共享数据集和数据结构,从而减少每个任务的开销。
默认情况下,Flink允许子任务共享插槽slot,即使它们是不同任务的子任务也是如此,只要它们来自同一任务即可。结果是一个插槽可以容纳整个job流。允许此插槽共享有两个主要好处:
子任务在TaskManager之间公平分配。
具有共享任务插槽的TaskManager API还包括一种资源组机制,可用于防止不良的时隙共享。根据经验,默认的任务插槽数量应该是CPU内核的数量。使用超线程时,每个插槽将占用2个或更多物理线程上下文。
上述讲述了Job Manager、Task Manager的分布式运行情况,这里对TaskManager所使用内存模型进行介绍。Flink尝试使用户免受配置JVM进行数据密集型处理的复杂性的影响。在大多数情况下,用户只需要设置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取决于设置的方式),并可能通过调整JVM堆与管理内存的比率taskmanager.memory.managed.fraction等选项可用于执行性能调整和修复与内存相关的错误。
TaskManager进程总内存 = Flink总内存 + JVM元空间 + JVM开销。其由taskmanager.memory.process.size参数设置其大小。
Flink总内存 = 框架堆内存 + 任务堆内存 + 任务堆外内存 + 管理内存 + 网络内存 。包括TaskExecutor占用的所有内存。JVM Metaspace和JVM Overhead除外。其由taskmanager.memory.flink.siz参数设置大小
各类内存或开销相关参数说明:
组建 | 配置参数 | 描述 |
---|---|---|
框架堆内存 | taskmanager.memory.framework.heap.size | 专用于Flink框架的JVM堆内存(高级选项) |
任务堆内存 | taskmanager.memory.task.heap.size | 专用于Flink应用程序的JVM堆内存可运行操作员和用户代码 |
托管内存 | taskmanager.memory.managed.sizetaskmanager.memory.managed.fraction | 由Flink管理的本机内存,保留用于排序,哈希表,中间结果的缓存和RocksDB状态后端 |
框架堆外内存 | taskmanager.memory.framework.off-heap.size | 专用于Flink框架的堆外直接(或本机)内存(高级选项) |
任务堆外内存 | taskmanager.memory.task.off-heap.size | 专用于Flink应用程序以运行操作员的堆外直接(或本机)内存 |
网络内存 | taskmanager.memory.network.mintaskmanager.memory.network.maxtaskmanager.memory.network.fraction | 直接存储器保留用于任务之间的数据记录交换(例如缓冲用于传输通过网络),它是一种封端的分馏成分的的总弗林克存储器 |
JVM元空间 | taskmanager.memory.jvm-metaspace.size | Flink JVM进程的元空间大小 |
JVM开销 | taskmanager.memory.jvm-overhead.mintaskmanager.memory.jvm-overhead.maxtaskmanager.memory.jvm-overhead.fraction | 架空其他JVM保留本机内存:如线程堆栈,代码缓存,垃圾收集等的空间,这是一个上限分级成分的的总进程内存 |
各参数功能描述:
这些配置值设置决定了TaskManager使用内存大小。
其他内存或资源使用说明
还有内存组件的大小可以通过相应的选项简单地设置。其他组件可以使用多个选项进行调整。
这些组件的大小必须始终在其最大值和最小值之间,否则Flink启动将失败。最大值和最小值具有默认值,或者可以通过相应的配置选项明确设置。例如,如果仅设置以下内存选项:Flink总内存= 1000Mb,网络最小= 64Mb,网络最大= 128Mb,网络比例= 0.1那么网络内存将为1000Mb x 0.1 = 100Mb,在64-128Mb范围内。(请注意,如果您配置相同的最大值和最小值,则实际上意味着它的大小固定为该值。如果未显式配置组件内存,则Flink将使用百分比基于总内存来计算内存大小。计算值由其相应的最小/最大选项限制)如果定义了总内存及其其他组件的大小,也可能会忽略该百分比。在这种情况下,网络内存是总内存的其余部分。派生值仍必须在其最小/最大范围内,否则配置将失败。Flink总内存中的所有其他组件都具有默认值,包括默认的托管内存部分。那么,网络内存不是百分比(1000Mb x 0.1 = 100Mb),而是总Flink内存的其余部分,该部分将在64-256Mb范围内,否则将失败。
Flink在启动任务执行程序进程时,根据配置的或派生的内存组件大小,显式添加以下与内存相关的JVM参数:
JVM Arguments | Value |
---|---|
-Xmx and -Xms | Framework + Task Heap Memory |
-XX:MaxDirectMemorySize | Framework + Task Off-Heap + Network Memory |
-XX:MaxMetaspaceSize | JVM Metaspace |
如果您在计算机上作为单个Java程序在本地启动Flink而不创建集群(例如从IDE中创建),则将忽略所有组件,但以下各项除外:
Memory component | Relevant options | Default value |
---|---|---|
Task heap | taskmanager.memory.task.heap.size | infinite |
Task off-heap | taskmanager.memory.task.off-heap.size | infinite |
Managed memory | taskmanager.memory.managed.size | 128Mb |
Network memory | taskmanager.memory.network.mintaskmanager.memory.network.max | 64Mb |
上面列出的所有组件都可以但不必为本地执行显式配置。如果未配置它们,则将它们设置为其默认值。任务堆内存和 任务堆外内存被认为是无限的(Long.MAX_VALUE字节),并且托管内存 的默认值仅对于本地执行模式为128Mb。
注意在这种情况下,任务堆大小与实际堆大小没有任何关系。它可能与后续版本的未来优化相关。启动的本地进程的实际JVM堆大小不受Flink的控制,取决于您如何启动该进程。如果要控制JVM堆大小,则必须显式传递相应的JVM参数,例如-Xmx,-Xms。
总结
本篇是对Flink内存模型及其相关知识点进行详细说明讲解,掌握这些知识后,就更快排查和解决如IllegalConfigurationException、OutOfMemoryError: Java heap space、OutOfMemoryError: Direct buffer memory、OutOfMemoryError: Metaspace、IOException: Insufficient number of network buffers和Container Memory Exceeded等常见异常。