本文主要介绍 Flink on Yarn/K8s 的原理及应用实践,文章将从 Flink 架构、Flink on Yarn 原理及实践、Flink on Kubernetes 原理剖析三部分内容进行分享并对 Flink on Yarn/Kubernetes 中存在的部分问题进行解答。如果你对于Apache Flink了解不多,可以先阅读Apache Flink 零基础入门系列文章。
用户通过 DataStream API、DataSet API、SQL 和 Table API 编写 Flink 任务,它会生成一个JobGraph。JobGraph 是由 source、map()、keyBy()/window()/apply() 和 Sink 等算子组成的。当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。
JobManager的功能主要有:
TaskManager 是负责具体任务的执行过程,在 JobManager 申请到资源之后开始启动。TaskManager 里面的主要组件有:
TaskManager 被分成很多个 TaskSlot,每个任务都要运行在一个 TaskSlot 里面,TaskSlot 是调度资源里的最小单位。
在介绍 Yarn 之前先简单的介绍一下 Flink Standalone 模式,这样有助于更好地了解 Yarn 和 Kubernetes 架构。
以上就是一个 Standalone 任务的运行过程。
接下来总结一下 Flink 的基本架构和它在运行时的一些组件,具体如下:
Yarn 模式在国内使用比较广泛,基本上大多数公司在生产环境中都使用过 Yarn 模式。首先介绍一下 Yarn 的架构原理,因为只有足够了解 Yarn 的架构原理,才能更好的知道 Flink 是如何在 Yarn 上运行的。
Yarn 的架构原理如上图所示,最重要的角色是 ResourceManager,主要用来负责整个资源的管理,Client 端是负责向 ResourceManager 提交任务。
用户在 Client 端提交任务后会先给到 Resource Manager。Resource Manager 会启动 Container,接着进一步启动 Application Master,即对 Master 节点的启动。当 Master 节点启动之后,会向 Resource Manager 再重新申请资源,当 Resource Manager 将资源分配给 Application Master 之后,Application Master 再将具体的 Task 调度起来去执行。
Yarn 集群中的组件包括:
以在 Yarn 上运行 MapReduce 任务为例来讲解下 Yarn 架构的交互原理:
Flink on Yarn 中的 Per Job 模式是指每次提交一个任务,然后任务运行完成之后资源就会被释放。在了解了 Yarn 的原理之后,Per Job 的流程也就比较容易理解了,具体如下:
在 Per Job 模式中,执行完任务后整个资源就会释放,包括 JobManager、TaskManager 都全部退出。而 Session 模式则不一样,它的 Dispatcher 和 ResourceManager 是可以复用的。Session 模式下,当 Dispatcher 在收到请求之后,会启动 JobManager(A),让 JobManager(A) 来完成启动 TaskManager,接着会启动 JobManager(B) 和对应的 TaskManager 的运行。当 A、B 任务运行完成后,资源并不会释放。Session 模式也称为多线程模式,其特点是资源会一直存在不会释放,多个 JobManager 共享一个 Dispatcher,而且还共享 Flink-YARN ResourceManager。
Session 模式和 Per Job 模式的应用场景不一样。Per Job 模式比较适合那种对启动时间不敏感,运行时间较长的任务。Seesion 模式适合短时间运行的任务,一般是批处理任务。若用 Per Job 模式去运行短时间的任务,那就需要频繁的申请资源,运行结束后,还需要资源释放,下次还需再重新申请资源才能运行。显然,这种任务会频繁启停的情况不适用于 Per Job 模式,更适合用 Session 模式。
Yarn 模式的优点有:
Yarn 模式虽然有不少优点,但是也有诸多缺点,例如运维部署成本较高,灵活性不够。
关于 Flink on Yarn 的实践在 https://ververica.cn/developers/flink-training-course1/ 上面有很多课程,例如:《Flink 安装部署、环境配置及运行应用程序》 和 《客户端操作》都是基于 Yarn 进行讲解的,这里就不再赘述。
Kubernetes 是 Google 开源的容器集群管理系统,其提供应用部署、维护、扩展机制等功能,利用 Kubernetes 能方便地管理跨机器运行容器化的应用。Kubernetes 和 Yarn 相比,相当于下一代的资源管理系统,但是它的能力远远不止这些。
Kubernetes(k8s)中的 Master 节点,负责管理整个集群,含有一个集群的资源数据访问入口,还包含一个 Etcd 高可用键值存储服务。Master 中运行着 API Server,Controller Manager 及 Scheduler 服务。
Node 为集群的一个操作单元,是 Pod 运行的宿主机。Node 节点里包含一个 agent 进程,能够维护和管理该 Node 上的所有容器的创建、启停等。Node 还含有一个服务端 kube-proxy,用于服务发现、反向代理和负载均衡。Node 底层含有 docker engine,docker 引擎主要负责本机容器的创建和管理工作。
Pod 运行于 Node 节点上,是若干相关容器的组合。在 K8s 里面 Pod 是创建、调度和管理的最小单位。
Kubernetes 的架构如图所示,从这个图里面能看出 Kubernetes 的整个运行过程。
Kubernetes 中比较重要的概念有:
Flink on Kubernetes 的架构如图所示,Flink 任务在 Kubernetes 上运行的步骤有:
JobManager 的执行过程分为两步:
TaskManager 也是通过 Deployment 来进行描述,保证 n 个副本的 Container 运行 TaskManager,同时也需要定义一个标签,例如 flink-taskmanager。
对于 JobManager 和 TaskManager 运行过程中需要的一些配置文件,如:flink-conf.yaml、hdfs-site.xml、core-site.xml,可以通过将它们定义为 ConfigMap 来实现配置的传递和读取。
整个交互的流程比较简单,用户往 Kubernetes 集群提交定义好的资源描述文件即可,例如 deployment、configmap、service 等描述。后续的事情就交给 Kubernetes 集群自动完成。Kubernetes 集群会按照定义好的描述来启动 pod,运行用户程序。各个组件的具体工作如下:
接下来就讲一下 Flink on Kubernetes 的实践篇,即 K8s 上是怎么运行任务的。
•Session Cluster
•启动
•kubectl create -f jobmanager-service.yaml
•kubectl create -f jobmanager-deployment.yaml
•kubectl create -f taskmanager-deployment.yaml
•Submit job
•kubectl port-forward service/flink-jobmanager 8081:8081
•bin/flink run -d -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar
•停止
•kubectl delete -f jobmanager-deployment.yaml
•kubectl delete -f taskmanager-deployment.yaml
•kubectl delete -f jobmanager-service.yaml
首先启动 Session Cluster,执行上述三条启动命令就可以将 Flink 的 JobManager-service、jobmanager-deployment、taskmanager-deployment 启动起来。启动完成之后用户可以通过接口进行访问,然后通过端口进行提交任务。若想销毁集群,直接用 kubectl delete 即可,整个资源就可以销毁。
Flink 官方提供的例子如图所示,图中左侧为 jobmanager-deployment.yaml 配置,右侧为 taskmanager-deployment.yaml 配置。
在 jobmanager-deployment.yaml 配置中,代码的第一行为 apiVersion,apiVersion 是API 的一个版本号,版本号用的是 extensions/vlbetal 版本。资源类型为 Deployment,元数据 metadata 的名为 flink-jobmanager,spec 中含有副本数为 1 的 replicas,labels 标签用于 pod 的选取。containers 的镜像名为 jobmanager,containers 包含从公共 docker 仓库下载的 image,当然也可以使用公司内部的私有仓库。args 启动参数用于决定启动的是 jobmanager 还是 taskmanager;ports 是服务端口,常见的服务端口为 8081 端口;env 是定义的环境变量,会传递给具体的启动脚本。
右图为 taskmanager-deployment.yaml 配置,taskmanager-deployment.yaml 配置与 jobmanager-deployment.yaml 相似,但 taskmanager-deployment.yaml 的副本数是 2 个。
接下来是 jobmanager-service.yaml 的配置,jobmanager-service.yaml 的资源类型为 Service,在 Service 中的配置相对少一些,spec 中配置需要暴露的服务端口的 port,在 selector 中,通过标签选取 jobmanager 的 pod。
除了 Session 模式,还有一种 Per Job 模式。在 Per Job 模式下,需要将用户代码都打到镜像里面,这样如果业务逻辑的变动涉及到 Jar 包的修改,都需要重新生成镜像,整个过程比较繁琐,因此在生产环境中使用的比较少。
以使用公用 docker 仓库为例,Job Cluster 的运行步骤如下:
sh build.sh --from-release --flink-version 1.7.0 --hadoop-version 2.8 --scala-version 2.11 --job-jar ~/flink/flink-1.7.1/examples/streaming/TopSpeedWindowing.jar --image-name topspeed
docker tag topspeed zkb555/topspeedwindowing
docker push zkb555/topspeedwindowing
kubectl create -f job-cluster-service.yaml
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB=org.apache.flink.streaming.examples.windowing.TopSpeedWindowing FLINK_JOB_PARALLELISM=3 envsubst < job-cluster-job.yaml.template | kubectl create -f –
FLINK_IMAGE_NAME=zkb555/topspeedwindowing:latest FLINK_JOB_PARALLELISM=4 envsubst < task-manager-deployment.yaml.template | kubectl create -f -
Q: Flink 在 K8s 上可以通过 Operator 方式提交任务吗?
目前 Flink 官方还没有提供 Operator 的方式,Lyft 公司开源了自己的Operator 实现。
Q: 在 K8s 集群上如果不使用 Zookeeper 有没有其他高可用(HA)的方案?
Etcd 是一个类似于 Zookeeper 的高可用键值服务,目前 Flink 社区正在考虑基于 Etcd 实现高可用的方案以及直接依赖 K8s API 的方案。
Q: Flink on K8s 在任务启动时需要指定 TaskManager 的个数,有和 Yarn 一样的动态资源申请方式吗?
Flink on K8s 目前的实现在任务启动前就需要确定好 TaskManager 的个数,这样容易造成 TM 指定太少,任务无法启动,或者指定的太多,造成资源浪费。社区正在考虑实现和 Yarn 一样的任务启动时动态资源申请的方式。这是一种和 K8s 结合的更为 Nativey 的方式,称为 Active 模式。Active 意味着 ResourceManager 可以直接向 K8s 集群申请资源。具体设计方案和进展请关注:https://issues.apache.org/jira/browse/FLINK-9953。
相关文章: Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践
领取专属 10元无门槛券
私享最新 技术干货