前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink源码分析之Flink on YARN - Per Job

Flink源码分析之Flink on YARN - Per Job

原创
作者头像
楞头青
修改2022-07-09 20:32:30
2.2K0
修改2022-07-09 20:32:30
举报
文章被收录于专栏:流计算

问题导读

  1. 用户程序什么时候、在哪、如何被调用执行的?
  2. JobManager进程什么时候、在哪、如何被拉起来执行?启动后做了什么?
  3. TaskManager进程什么时候、在哪、如何被拉起来执行?
  4. 用户程序的Task什么时候、如何被分发到各个TaskManager进程中执行?

阅读说明: 源码版本:Flink release-1.14.4 先回顾一下YARN架构与YARN job 8步工作流程 Flink on yarn其实就是按照YARN job 8步工作流程走 以上述4个问题为导向,看Flink具体是如何实现的,8步中1、3、4、5、7、8在Flink代码哪里找到(2和6是YARN执行)

YARN架构

Flink-Yarn-PerJob-YARN.jpg
Flink-Yarn-PerJob-YARN.jpg

YARN集群介绍

YARN集群用来做资源的管理与用户应用程序的调度。

用户的应用程序是一个分布式程序,需要按照YARN的规范来写才能提交到YARN集群被调度运行起来。

ResourceManager(RM)

  • YARN集群中的Master。
  • 资源调度:根据容量、队列等限制条件将系统中的资源打包为Container对象分配给应用程序(AM)。
  • 应用程序管理:通知NM启动AM;监控AM运行状态并在失败时重启它。

NodeManager(NM)

  • YARN集群中的Slave。
  • 是应用程序运行的节点。
  • 接收来在ResourceManager的请求,划定一个Container描述的资源限制来启动用户应用程序的ApplicationMaster进程。
  • 接收来在用户应用程序ApplicationMaster的Container启动、停止请求。
  • 定时向ResourceManager汇报本节点上的资源使用情况和各个Container运行状态。

ApplicationMaster(AM)

当用户向YARN提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的Master,它负责向ResourceManager申请Container资源,并通知NodeManager启动Container来执行具体的任务,监控任务的状态以便在失败时重启Container或者任务完成时回收Container,这个就是ApplicationMaster。

Container

  • 是资源的抽象。
  • Container对象包含资源的限制信息:Vcores、Memory,也包含资源的位置信息:需要运行在哪个NodeManager节点上。

由NodeManager进程负责启动。

YARN job工作流程

  1. Client向ResourceManager提交应用程序(包含启动ApplicationMaster的命令)。
  2. ResourceManager为应用分配第一个Container并与对应的NodeManager通信要求它启动ApplicationMaster。
  3. ApplicationMaster向ResourceManager注册并与ResourceManager保持心跳。
  4. ApplicationMaster为任务的运行向ResourceManager申请若干Container资源。
  5. ApplicationMaster领取ResourceManager分配的Container并初始化相关运行信息,便与对应的NodeManager通信要求它启动Container。
  6. NodeManager为Container设置好运行环境(下载运行资源、设置环境变量、资源限制等),将启动命令写到脚本文件中,运行脚本启动Container。
  7. Container运行期间向ApplicationMaster汇报自己的状态和任务进度。
  8. 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销自己,释放相关Container资源。

用户程序什么时候、在哪、谁调用执行的?

入口示例程序

是一个Stream job

代码语言:shell
复制
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

流程图

Flink-Yarn-PerJob-1.jpg
Flink-Yarn-PerJob-1.jpg

步骤说明

1. shell命令交由入口类CliFrontend执行。

2. CliFrontend加载配置和命令行参数,生成Configuration和PackagedProgram对象。

1) 找配置文件目录,优先级:system env(FLINK_CONF_DIR) > ../conf > conf。

2) 从配置文件目录中加载配置文件并解析命令行参数args融合生成Configuration,由Configuration构建出PackagedProgram(包含URL jarFile、Class mainClass、String[] args、List<URL> classpaths、URLClassLoader userCodeClassLoader、savepointSettings等信息)。

3) 通过ClientUtils设置用户程序的执行环境ContextEnvironment和StreamContextEnvironment,为两个Environment设置了PipelineExecutorServiceLoader(用于找到PipelineExecutorFactory)、Configuration和ClassLoader(用户程序PackagedProgram指定的URLClassLoader)等信息。

4) 设置当前线程ClassLoader为上述3中指定的用户程序ClassLoader,调用PackagedProgram执行用户程序,执行完用户程序后重置当前线程ClassLoader。

3. PackagedProgram通过反射的方式调用用户程序入口类的main方法执行用户程序。

4. 用户程序执行,完成StreamGraph的构建。

使用纯SQL API,转换过程SQL -> calcite(SqlNode -> RelNode) -> Operation -> Transformation -> Pipeline

使用Table API,转换过程Operation -> Transformation -> Pipeline

使用DataStream API,转换过程Transformation -> Pipeline

注:流模式Pipeline的实现类是StreamGraph。

5. 找到匹配的PipelineExecutor去执行Pipeline。

PipelineExecutor的实现有多种:

LocalExecutor:本地模式 RemoteExecutor:Standalone模式 YarnJobClusterExecutor:YARN per job模式 YarnSessionClusterExecutor:YARN session job模式 KubernetesSessionClusterExecutor:K8S session job模式 EmbeddedExecutor:Application模式用

这里采用的是YarnJobClusterExecutor,如何找?

  1. StreamExecutionEnvironment通过PipelineExecutorServiceLoader找到PipelineExecutorFactory。PipelineExecutorServiceLoader先以SPI的方式加载PipelineExecutorFactory,再过滤出 与Configuration配置兼容的Factory。
  2. PipelineExecutorFactory负责创建对应的PipelineExecutor,由PipelineExecutor去执行Pipeline。

6. 执行Pipeline:先构建JobGraph,再找到匹配的ClusterDescriptor来部署flink集群以执行JobGraph。

  1. StreamGraph -> JobGraph
  2. 由ClientFactory工厂类会创建对应的ClusterDescriptor,从Configuration中整理出ClusterSpecification(集群描述信息,包含JM和TM的内存大小以及slots个数)。
  3. 通过ClusterDescriptor部署集群:clusterDescriptor.deployJobCluster(ClusterSpecification, JobGraph, detached)。

注: StreamGraph到JobGraph主要变化 node: ( List<StreamEdge> -> StreamNode -> List<StreamEdge> ) --> node --> node ... 到 node: ( List<JobEdge> -> JobVertex -> List<IntermediateDataSet> ) --> node --> node ... 的转换,另外完成Chaining,即多个StreamNode会合并为一个JobVertex

7. 通过ClusterDescriptor部署flink集群来执行JobGraph。

  1. 检查配置;
  2. 上传资源到HDFS;
  3. 提交yarn job让YARN启动AM;
  4. 循环等待提交结果ApplicationReport;
  5. 回设rpc、rest、ha信息;
  6. 返回集群Client对象RestClusterClient。

注: HDFS目录由yarn.staging-directory参数指定 flink-dist, lib/, plugins/ 这些多个flink应用都用到的,预先上传到yarn.provided.lib.dirs参数指定的HDFS目录即可,NM会缓存避免频繁上传下载。

调用链

代码语言:java
复制
1. bin/flink run
  
2. CliFrontend.java
 	 	main(String[] args)
  		CliFrontend cli = new CliFrontend(configuration, customCommandLines)
  		retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args))
  			run(params)
  				executeProgram(Configuration, PackagedProgram)
  					ClientUtils.executeProgram
   ClientUtils.java
   	executeProgram
  		program.invokeInteractiveModeForExecution
3.  			callMainMethod(entryClass, args)
  					entryClass.getMethod("main", String[].class).invoke(null, (Object) args)
  
4. 用户程序入口类
   	main(args)
  
5. StreamExecutionEnvironment.java
   	executeAsync(StreamGraph)
  		executorFactory.getExecutor(configuration).execute(streamGraph, configuration, userClassloader)
  
6. AbstractJobClusterExecutor.java
   	execute
			{
				JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration)
        clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)
        clusterSpecification = clusterClientFactory.getClusterSpecification(configuration)
        clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, detached)
			}
    	
7. YarnClusterDescriptor.java
   	ClusterClientProvider<ApplicationId> deployJobCluster(ClusterSpecification, JobGraph, detached)
  		deployInternal
				{		
  				1) 检查用户权限(Kerberos证书)、vcores、hadoop环境变量、yarn queue等
            
  				2) ApplicationReport report = startAppMaster(...)
          	{
              1. 上传资源到HDFS(yarnShipFiles、yarn.ship-archives、pipeline.jars、配置文件等)
              2. 将JobGraph序列化到文件中,并上传到HDFS(per job模式才有)
              3. 设置集群HA信息
              4. 设置ApplicationSubmissionContext
                    setApplicationName
                    setApplicationType
                    setAMContainerSpec    // Application Master Container
                        setCommands       // 通过class参数指定了yarn AM container启动类为YarnJobClusterEntrypoint
                        setTokens
                        setLocalResources(job.graph、flink-conf.yaml、yarn-site.xml、krb5.conf、keytab、kerberos)
                        setEnvironment
                            flink-conf.yaml中用户配置的以containerized.master.env.为前缀的变量
                            _FLINK_CLASSPATH // Flink app class path
                            _FLINK_DIST_JAR  // local path of flink dist jar 
                            classpath from YARN configuration
                            ...
                    setResource(masterMemoryMB、yarn.appmaster.vcores)
                    setPriority
                    setQueue	// yarn queue
                    setApplicationNodeLabel
                    setApplicationTags
                5. 提交yarn application: yarnClient.submitApplication(appContext)
                   // RUNNING或FINISHED状态时,正常退出循环;FAILED或KILLED时抛异常退出;状态变化时打印日志,运行超60秒打印日志
                6. loop循环等待提交结果: ApplicationReport report = yarnClient.getApplicationReport(appId)
        			}
  					3) 从ApplicationReport中获取rpc和rest的地址和端口信息以及ApplicationId信息,设置回Configuration;设置ClusterId=ApplicationId信息到HA中。
            4) 返回RestClusterClient(Configuration, ApplicationId)
				}	

总结

  1. YARN per job模式下用户程序在Client端被执行,Client端即执行flink shell命令的执行节点。
  2. Client端主要工作就是将用户写的代码转换为JobGraph,向YARN提交应用以执行JobGraph。
  3. User Code(SQL API、Table API、DataStream API)-> StreamGraph
  4. PipelineExecutor(YarnJobClusterExecutor)将StreamGraph转换为JobGraph
  5. ClusterDescriptor(YarnClusterDescriptor)通过YARN部署flink集群以执行JobGraph

JobManager进程什么时候、在哪、如何被拉起来执行?启动后做了什么?

入口

JobManager进程就是YARN job中的ApplicationMaster(AM)。

YARN NodeManager接收到YARN RM发送的AM container启动请求后为其设置好运行环境(环境变量、JAR包、配置文件、Cgroup资源限制等),将启动命令写到脚本文件中,运行脚本启动Container(JobManager进程)。

后续操作如下图所示:

流程图

Flink-Yarn-PerJob-2.jpg
Flink-Yarn-PerJob-2.jpg

步骤说明

1. 启动RpcService,内部创建了ActorSystem。

Flink集群内RPC通信是封装了Akka Actor来实现。

ActorSystem会创建一个Supervisor Actor,用来创建并启动其他的Actor,比如ResourceManager、Dispatcher、JobMaster。

2. 创建并启动WebMonitorEndpoint

这是一个借助netty实现的Rest服务,用来响应web请求。

ApplicationMaster启动后向YARN RM注册,注册的appTrackingUrl就是这个Web服务的地址,这样就可从YARN资源管理页面跳转到Flink Web UI页面。

3. 创建并启动ResourceManager。

  • 这里指的是Flink的ResourceManager,要与上述中YARN ResourceManager区别开。Flink的RM通过YarnResourceManagerDriver对象内的AMRMClientAsync沟通YARN RM,通过NMClientAsync沟通YARN NM。
  • 内部创建SlotManager,用来管理Slot资源。
  • ResourceManager是一个RPC服务,可以接收RPC请求。内部是通过步骤1中创建的RpcService来启动ResourceManager RPC服务,实际上是由RpcService中的Supervisor Actor创建的一个ResourceManager Actor来处理RPC请求。

4. 创建并启动Dispatcher。

Dispatcher是一个RPC服务,可以接收RPC请求。RPC服务的创建与服务过程与ResourceManager一样,不再多述。

5. Dispatcher创建并启动JobMaster。

  1. Dispatcher的onStart方法被调用,方法内部会启动recovered jobs(JobGraph),per job模式下,recovered job不为空,是借助FileJobGraphRetriever类从job.graph文件中读取而来。
  2. JobMaster内部有SchedulerNG和SlotPoolService对象。
  3. JobMaster创建完成后,onStart方法被调用,会触发SchedulerNG的调度,SchedulerNG向Flink ResourceManager申请slot资源,Flink RM收到请求向YARN RM申请启动container运行TaskManager进程。TaskManager进程启动后向Flink RM注册slot资源,JobMaster中的SchedulerNG就能从Flink RM申请获取到slot资源,然后向TaskManager提交运行Task。

调用链

代码语言:java
复制
YarnJobClusterEntrypoint.java
  main(String[] args)
  	ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint)
  		clusterEntrypoint.startCluster()
  			runCluster(Configuration, PluginManager)
  				initializeServices
						{
1.            commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService
              JMXService
              IO线程池
              HA Service 
              BlobServer
              HeartbeatServices
              MetricRegistry
              MetricQuery	RpcService
              ProcessMetricGroup
              ExecutionGraphInfoStore
						}
  				clusterComponent = dispatcherResourceManagerComponentFactory.create
            
DefaultDispatcherResourceManagerComponentFactory.java
	create(...)
  	{
2.   	webMonitorEndpoint = restEndpointFactory.createRestEndpoint
      webMonitorEndpoint.start()
        
3.		resourceManagerService = ResourceManagerServiceImpl.create    
  
4.    dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner
				{
  					...
            Dispatcher dispatcher = dispatcherFactory.createDispatcher
            dispatcher.start() 
            	{
              	...
5.               startRecoveredJobs()
                	{
                  	for (JobGraph recoveredJob : recoveredJobs){
                      ...
                      JobMaster jobMaster = new JobMaster(...) 
                      jobMaster.start()
                    }
                	}
            	}
				}
      resourceManagerService.start()
      	{
        	...
           YarnResourceManagerDriver.java
            initializeInternal()
          		{
            		 ...
                 // 创建并启动AMRMClientAsync,联系YARN RM
              	 resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient
                 resourceManagerClient.init(yarnConfig)
                 resourceManagerClient.start()
                 
                 // AM启动后向YARN RM注册自己,这样可以通过YARN跳转到Flink web ui页面
                 resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, webInterfaceUrl)               
                ...
                 // 创建并启动NMClientAsync,用于联系YARN NM
                 nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient
                 nodeManagerClient.init(yarnConfig)
                 nodeManagerClient.start() 
          		}
           	YarnContainerEventHandler.onContainersAllocated(List<Container> containers)
          		{
							 	...
                 // 运行任务所需的Container都申请并领取完毕后,AM维护与YARN RM心跳
                 // resourceManagerClient = AMRMClientAsync
                 if (getNumRequestedNotAllocatedWorkers() <= 0) {
                 		resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
                 }
          		}
      	}  
  	}
  

总结

  1. Flink JobManager进程就是YARN job中的AM。
  2. Flink Client通过YarnClusterDescriptor中的YarnClient向YARN RM提交应用,YARN RM通过调度策略为其分配AM container资源,并通知container指定的YARN NM启动container。
  3. YARN NM接收到YARN RM发送的AM container启动请求后为其设置好运行环境(环境变量、JAR包、配置文件等),将启动命令写到脚本文件中,运行脚本启动Container(JobManager进程)。
  4. JobManager进程启动后运行WebMonitorEndpoint,并向YARN RM注册自己,注册的appTrackingUrl就是WebMonitorEndpoint中netty服务占用的地址和端口,即Flink web ui。
  5. 启动ResourceManager、Dispatcher等服务,并通过Dispatcher启动JobMaster来执行JobGraph。

TaskManager进程什么时候、在哪、如何被拉起来执行?

流程图

Flink-Yarn-PerJob-3.jpg
Flink-Yarn-PerJob-3.jpg

步骤说明

1. JobMaster通过SchedulerNG执行JobGraph的调度。

调度分两步走:

1) 获取资源(Task运行的容器,即Flink中的Slot,Slot需要从Container中划分)

2) 调度任务(Task)

注:这里仅说第1步,即TaskManager进程如何被调度起来为Task运行提供slots资源支持。第2步在下个问题描述。

2. SlotPoolService负责slots资源申请,先从缓存中(内存)检查是否有可用的slots资源,有的话直接分配,没的话会向Flink RM发送RPC请求。

AllocatedSlotPool中缓存有已经注册的slots资源:Map<AllocationID, AllocatedSlot> registeredSlots。

AllocatedSlot属性:AllocationID、TaskManagerLocation、TaskManagerGateway、ResourceProfile、physicalSlotNumber。

3. Flink RM接收到JobMaster发送的RPC资源请求,会将处理交SlotManager,SlotManager又通过ResourceManagerDriver来做具体的资源申请。

YarnResourceManagerDriver内部有YARN RM Client和YARN NM Client。

1) 通过YARN RM Client请求YARN RM分配containers。

2) 通过YARN NM Client向YARN NM发送请求,创建container运行TaskManager进程(指定了TaskManager进程入口类为YarnTaskExecutorRunner)。

调用链

代码语言:java
复制
JobMaster.java
	new JobMaster	
  	{
    	...
      // jobmanager.scheduler 默认值为Ng,因此创建的SchedulerNG为DefaultScheduler
      this.schedulerNG = createScheduler(...)  
      	{
        	// JobGraph -> ExecutionGraph
          this.executionGraph = createAndRestoreExecutionGraph(...)  
        }
    }
	jobMaster.start 
  ...
  onStart
  	startJobExecution
    	startJobMasterServices
      startScheduling
1.    	schedulerNG.startScheduling
    
DefaultScheduler.java
	// 方法体在父类SchedulerBase.java中  
  startScheduling		
  	startSchedulingInternal
    	schedulingStrategy.startScheduling
    
PipelinedRegionSchedulingStrategy.java
	startScheduling
  	maybeScheduleRegions(Set<SchedulingPipelinedRegion> regions) 
    	// 遍历regions,按region调度
      maybeScheduleRegion
      	schedulerOperations.allocateSlotsAndDeploy(List<ExecutionVertexDeploymentOption>)
    
DefaultScheduler.java  
	allocateSlotsAndDeploy
  	allocateSlots
    	executionSlotAllocator.allocateSlotsFor(List<ExecutionVertexID> executionVertexIds)

SlotSharingExecutionSlotAllocator.java
	allocateSlotsFor
  	getOrAllocateSharedSlot
    	slotProvider.allocatePhysicalSlot(PhysicalSlotRequest)

PhysicalSlotProviderImpl.java
	allocatePhysicalSlot
  	// 先看有可用的slot没,有的话直接分配
    tryAllocateFromAvailable
    // 没的话请求Flink RM获取
    orElseGet(requestNewSlot(...))
2.   	slotPool.requestNewAllocatedSlot(SlotRequestId, ResourceProfile, timeout)
    
DeclarativeSlotPoolBridge.java
	requestNewAllocatedSlot
  	internalRequestNewSlot
    	internalRequestNewAllocatedSlot
      	getDeclarativeSlotPool().increaseResourceRequirementsBy
    
DefaultDeclarativeSlotPool.java
	increaseResourceRequirementsBy
  	declareResourceRequirements
    	notifyNewResourceRequirements.accept(Collection<ResourceRequirement>)
    
DeclarativeSlotPoolService.java  
	declareResourceRequirements(Collection<ResourceRequirement>)
  	resourceRequirementServiceConnectionManager.declareResourceRequirements
    
DefaultDeclareResourceRequirementServiceConnectionManager.java
	declareResourceRequirements(ResourceRequirements)
  	triggerResourceRequirementsSubmission
    	sendResourceRequirements
      	service.declareResourceRequirements
    
DeclarativeSlotPoolService.java
	// 向Flink RM发送RPC请求,获取slots资源
  resourceManagerGateway.declareRequiredResources(JobMasterId, ResourceRequirements, Time timeout)
 

// Flink ResourceManager服务端处理资源请求
ResourceManager.java    
	declareRequiredResources   
3.	slotManager.processResourceRequirements

DeclarativeSlotManager.java 
	processResourceRequirements
  	resourceTracker.notifyResourceRequirements
    checkResourceRequirements()
    	tryFulfillRequirementsWithPendingSlots(JobID jobId, Collection<Map.Entry<ResourceProfile, Integer>> missingResources, ResourceCounter pendingSlots)
    		// 遍历missingResource
    		tryAllocateWorkerAndReserveSlot(ResourceProfile profile, ResourceCounter pendingSlots)
    			taskExecutorManager.allocateWorker(profile)
    
TaskExecutorManager.java
	allocateWorker
  	resourceActions.allocateResource(WorkerResourceSpec)

ResourceManager.java
	ResourceActionsImpl.allocateResource
  	startNewWorker

ActiveResourceManager.java
	startNewWorker
  	requestNewWorker        
    	resourceManagerDriver.requestResource(TaskExecutorProcessSpec)

YarnResourceManagerDriver.java
	requestResource
  		// 请求获取container资源
3.1 	resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority))

YarnResourceManagerDriver.java
	// container里有NM的地址信息
  YarnContainerEventHandler.onContainersAllocated(List<Container> containers)
  	onContainersOfPriorityAllocated
    	// 遍历containers
      startTaskExecutorInContainerAsync
      	// 创建ContainerLaunchContext请求对象
        context = createTaskExecutorLaunchContext(ResourceID containerId, String host, TaskExecutorProcessSpec taskExecutorProcessSpec)
        // 通过YARN NM Client发送请求,启动container运行TaskManager进程
3.2     nodeManagerClient.startContainerAsync(container, context) 

总结

  1. YARN per job模式下,TaskManager进程不是根据配置事先就启动好的,而是需要有JobGraph的驱动。
  2. JobGraph被转为ExecutionGraph,后被进一步分解为一个个Task(可运行的Runnable对象),Task是需要在划定的slot资源里执行的,slot由TaskManager进程提供。
  3. JobMaster通过SlotPoolService向Flink RM申请获取资源,Flink RM通过SlotManager管理slot的申请与释放,SlotManager又通过ResourceManagerDriver来做具体的资源申请。YARN per job模式中是YarnResourceManagerDriver实现类,driver先向YARN RM申请分配container资源,然后driver联系container指定的YARN NM启动container,即运行TaskManager进程。

用户程序的Task什么时候、如何被分发到各个TaskManager进程中执行?

流程图

Flink-Yarn-PerJob-4.jpg
Flink-Yarn-PerJob-4.jpg

说明

  1. JobMaster中的SchedulerNG拿到slots资源后,开始进行Task的调度。
  2. Execution是可调度的最小单位,内有LogicalSlot,即这个Execution要被调度到哪个Slot中,通过LogicalSlot可获取其对应的TaskManager RPC客户端代理对象。
  3. 这样一个个Execution被转化为对应的TaskDeploymentDescriptor对象,通过RPC协议提交给对应的TaskManager执行。
  4. TaskManager接收到submitTask请求后将TaskDeploymentDescriptor转化为Task对象,将其放到对应的TaskSlot中,启动Thread执行Task。

提示:Task运行过程中,接收上游发过来的数据,处理完发往下游,由下游Task继续处理,这期间数据的存取由TaskSlot中的MemoryManager控制,相较于Java的堆来说能有效控制内存使用限额,缩减数据占用内存的大小,及时回收内存,这就是Flink的内存管理。

调用链

代码语言:java
复制
DefaultScheduler.java
  allocateSlotsAndDeploy
  	// 请求获取slots资源
  	allocateSlots
  	// 部署Task
  	waitForAllSlotsAndDeploy
  		deployAll(List<DeploymentHandle> deploymentHandles)
  			// 遍历deploymentHandles
  			deployOrHandleError(DeploymentHandle)
  				deployTaskSafe
  					executionVertexOperations.deploy(ExecutionVertex)
  
DefaultExecutionVertexOperations.java
  deploy(ExecutionVertex executionVertex)
  	executionVertex.deploy()
  
ExecutionVertex.java
  deploy()
  	currentExecution.deploy()

Execution.java
  deploy()
  	TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway()
  	taskManagerGateway.submitTask(TaskDeploymentDescriptor, rpcTimeout)
  
RpcTaskManagerGateway.java
  submitTask
  	taskExecutorGateway.submitTask(TaskDeploymentDescriptor, jobMasterId, timeout)
  
// RPC服务端响应  
TaskExecutor.java
  submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout)
		{
  		...
      // Task implements Runnable
  		Task task = new Task(...)
      taskSlotTable.addTask(task)
      task.startTaskThread()
        // executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask)
        executingThread.start()
		}

Task.java
	run()
    doRun()
			{
  			...
         TaskInvokable invokable = loadAndInstantiateInvokable(...)
         	{
						...
             invokableClass = Class.forName(className, true, classLoader).asSubclass(TaskInvokable.class)
             invokableClass.getConstructor(Environment.class).newInstance(environment)
        	}
  				restoreAndInvoke(invokable)
          ...
			}

附录

ResourceService创建与启动

ResourceManager创建与启动过程涉及Leader选举、代理类、Akka Actor,代码跳转比较绕,故这里把完整调用链描述一下,有兴趣可阅读。

整个过程总结下来就是

  1. 使用ResourceManagerService封装ResourceManager,ResourceManagerService启动后先做leader选举,成为leader后再创建并启动ResourceManager。
  2. 创建ResourceManager对象时,在其内部创建并启动了Akka Actor来做RPC服务。
  3. ResourceManager对象创建完毕调用start做初始化工作,启动相关服务。

start() -> onStart()

1) ResourceManager将start的处理交由代理对象RpcServer(AkkaInvocationHandler实例)的start方法处理

2) RpcServer invoke方法被调用,发现是非RPC消息,就调用自身start方法

3) start通过ResourceManager Actor的引用ActorRef向ResourceManager Actor发送start类型控制消息

4) Actor(AkkaRpcActor)收到消息,将处理交由RpcEndpoint,即ResourceManager处理

onStart() 方法内部启动相关服务。

调用链用如下

代码语言:java
复制
// 1) 启动leader选举服务,选举leader
// start(LeaderContender) -> leaderContender.grantLeadership
{
	// implements ResourceManagerService, LeaderContender
  ResourceManagerServiceImpl.java
	  start()
  		leaderElectionService.start(this)
    
	// HA leader选举  
  DefaultLeaderElectionService.java
  	start(LeaderContender)
    	leaderElectionDriver = leaderElectionDriverFactory.createLeaderElectionDriver

  ZooKeeperLeaderElectionDriverFactory.java
  	createLeaderElectionDriver
    	new ZooKeeperLeaderElectionDriver
        	{
          	// latchPath节点用于leader选举
          	leaderLatch = new LeaderLatch(client, checkNotNull(latchPath))
            leaderLatch.addListener(this)
            leaderLatch.start()

            // leaderPath节点用于存储leader信息,监听该节点数据的变化
            cache = new NodeCache(client, leaderPath)
            cache.getListenable().addListener(this)
            cache.start()
          }

	ZooKeeperLeaderElectionDriver.java
  	// 成为leader时,latchPath上挂的监听会被回调,isLeader方法被执行
    isLeader()
    	leaderElectionEventHandler.onGrantLeadership

	DefaultLeaderElectionService.java
  	onGrantLeadership
    	leaderContender.grantLeadership   
}

// 2) 成为leader后,创建ResourceManager并启动RPC服务
{
  ResourceManagerServiceImpl.java
    grantLeadership
      startNewLeaderResourceManager
    		// resourceManagerFactory = ActiveResourceManagerFactory
				this.leaderResourceManager = resourceManagerFactory.createResourceManager 
  				{
            new ActiveResourceManager
              {
                ...
                // 启动RPC服务
                this.rpcServer = rpcService.startServer(this)
              }
  				}
    		startResourceManagerIfIsLeader
    			resourceManager.start()
}

// 3) start() -> onStart()
{
	// ResourceManager extends RpcEndpoint
  RpcEndpoint.java
  	start()
    	rpcServer.start()   

  // AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer
  AkkaInvocationHandler.java(客户端)
  	// rpcServer是JDK Proxy生成的代理对象,实现了InvocationHandler接口的invoke方法,故start方法交由代理对象的invoke执行
    invoke(Object proxy, Method method, Object[] args) 
    	// if(非rpc方法) 调用对象自身相应的method处理
      result = method.invoke(this, args)
      	start()
        	// rpcEndpoint是Actor的引用:ActorRef,可以用来向Actor发消息,这里是向自身ResourceManager发送控制类消息
          rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender())

	// AkkaRpcActor extends AbstractActor,是一个Actor  
  AkkaRpcActor.java(服务端)  
  	handleControlMessage(ControlMessages)
    	// 初始状态为STOPPED,即state = AkkaRpcActor.StoppedState
      state = state.start(this)
      	akkaRpcActor.rpcEndpoint.internalCallOnStart()

  RpcEndpoint.java
  	internalCallOnStart()
    	onStart()  
}

// 4) 启动ResourceManager下相关服务:心跳管理服务、SlotManager等
{
	ResourceManager.java
    onStart()
    	startResourceManagerServices()
    		...
    		startHeartbeatServices()
    		slotManager.start
}

Dispatcher创建与启动

代码语言:java
复制
// 入口 dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner

// 1) 创建DispatcherRunner,启动HA leader选举
{
  DefaultDispatcherRunnerFactory.java
    createDispatcherRunner
      DefaultDispatcherRunner.create
        DispatcherRunnerLeaderElectionLifecycleManager.createFor
          new DispatcherRunnerLeaderElectionLifecycleManager
    				// 启动leader选举服务, 
    				// 后续 start(LeaderContender) -> leaderContender.grantLeadership过程同上述ResourceManager调用链一致,不再多述
            leaderElectionService.start(dispatcherRunner)  
}

// 2) 成为leader后,启动Dispatcher(内部启动了RPC服务)
{
	DefaultDispatcherRunner.java
  	grantLeadership
    	startNewDispatcherLeaderProcess
    		dispatcherLeaderProcess = createNewDispatcherLeaderProcess  			
    		dispatcherLeaderProcess.start

	AbstractDispatcherLeaderProcess.java
  	start
    	startInternal
      	onStart

	JobDispatcherLeaderProcess.java
  	onStart
    	dispatcherGatewayServiceFactory.create

	DefaultDispatcherGatewayServiceFactory.java
  	create(DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter)
    	{
      	Dispatcher dispatcher = dispatcherFactory.createDispatcher
        	// JobDispatcherFactory.java
          createDispatcher
          	// MiniDispatcher extends Dispatcher
            new MiniDispatcher
            	{
              	// 启动RPC服务,内部会创建Actor,返回代理对象AkkaInvocationHandler
                this.rpcServer = rpcService.startServer(this)
              }
          dispatcher.start         
      }
}

// 3) 启动后做什么?startRecoveredJobs(start JobMasters)  
{
	Dispatcher.java
  	start()
    .. // start() -> onStart() 过程,同上述ResourceManager调用链一致,不再多述
    onStart()
    	startDispatcherServices
      	startRecoveredJobs
        	// 遍历recoveredJobs
          runRecoveredJob(recoveredJob)
          	runJob
            	createJobManagerRunner
              	JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner
                runner.start()

  // implements LeaderContender  
	JobMasterServiceLeadershipRunner.java  
  	start
    	leaderElectionService.start(this)
    	... // 后续 start(LeaderContender) -> leaderContender.grantLeadership过程同上述ResourceManager调用链一致,不再多述
      grantLeadership
      	startJobMasterServiceProcessAsync
        	verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
          	createNewJobMasterServiceProcess
            	jobMasterServiceProcess = jobMasterServiceProcessFactory.create
    
	DefaultJobMasterServiceProcessFactory.java
  	create
    	new DefaultJobMasterServiceProcess
      	this.jobMasterServiceFuture = jobMasterServiceFactory.createJobMasterService
    
	DefaultJobMasterServiceFactory.java
  	createJobMasterService
    	internalCreateJobMasterService
      	JobMaster jobMaster = new JobMaster(...)
        jobMaster.start()

}

参考

  1. Flink on YARN(上):一张图轻松掌握基础架构与启动流程
  2. Apache Flink 进阶(四):Flink on Yarn/K8s 原理剖析及实践

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题导读
  • YARN架构
    • YARN集群介绍
      • ResourceManager(RM)
      • NodeManager(NM)
      • ApplicationMaster(AM)
      • Container
    • YARN job工作流程
    • 用户程序什么时候、在哪、谁调用执行的?
      • 入口示例程序
        • 流程图
          • 步骤说明
            • 调用链
              • 总结
              • JobManager进程什么时候、在哪、如何被拉起来执行?启动后做了什么?
                • 入口
                  • 流程图
                    • 步骤说明
                      • 调用链
                        • 总结
                        • TaskManager进程什么时候、在哪、如何被拉起来执行?
                          • 流程图
                            • 步骤说明
                              • 调用链
                                • 总结
                                • 用户程序的Task什么时候、如何被分发到各个TaskManager进程中执行?
                                  • 流程图
                                    • 说明
                                      • 调用链
                                      • 附录
                                        • ResourceService创建与启动
                                          • Dispatcher创建与启动
                                          • 参考
                                          相关产品与服务
                                          容器服务
                                          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                                          领券
                                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档