阅读说明: 源码版本:Flink release-1.14.4 先回顾一下YARN架构与YARN job 8步工作流程 Flink on yarn其实就是按照YARN job 8步工作流程走 以上述4个问题为导向,看Flink具体是如何实现的,8步中1、3、4、5、7、8在Flink代码哪里找到(2和6是YARN执行)
YARN集群用来做资源的管理与用户应用程序的调度。
用户的应用程序是一个分布式程序,需要按照YARN的规范来写才能提交到YARN集群被调度运行起来。
当用户向YARN提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的Master,它负责向ResourceManager申请Container资源,并通知NodeManager启动Container来执行具体的任务,监控任务的状态以便在失败时重启Container或者任务完成时回收Container,这个就是ApplicationMaster。
由NodeManager进程负责启动。
是一个Stream job
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
1. shell命令交由入口类CliFrontend执行。
2. CliFrontend加载配置和命令行参数,生成Configuration和PackagedProgram对象。
1) 找配置文件目录,优先级:system env(FLINK_CONF_DIR) > ../conf > conf。
2) 从配置文件目录中加载配置文件并解析命令行参数args融合生成Configuration,由Configuration构建出PackagedProgram(包含URL jarFile、Class mainClass、String[] args、List<URL> classpaths、URLClassLoader userCodeClassLoader、savepointSettings等信息)。
3) 通过ClientUtils设置用户程序的执行环境ContextEnvironment和StreamContextEnvironment,为两个Environment设置了PipelineExecutorServiceLoader(用于找到PipelineExecutorFactory)、Configuration和ClassLoader(用户程序PackagedProgram指定的URLClassLoader)等信息。
4) 设置当前线程ClassLoader为上述3中指定的用户程序ClassLoader,调用PackagedProgram执行用户程序,执行完用户程序后重置当前线程ClassLoader。
3. PackagedProgram通过反射的方式调用用户程序入口类的main方法执行用户程序。
4. 用户程序执行,完成StreamGraph的构建。
使用纯SQL API,转换过程SQL -> calcite(SqlNode -> RelNode) -> Operation -> Transformation -> Pipeline
使用Table API,转换过程Operation -> Transformation -> Pipeline
使用DataStream API,转换过程Transformation -> Pipeline
注:流模式Pipeline的实现类是StreamGraph。
5. 找到匹配的PipelineExecutor去执行Pipeline。
PipelineExecutor的实现有多种:
LocalExecutor:本地模式 RemoteExecutor:Standalone模式 YarnJobClusterExecutor:YARN per job模式 YarnSessionClusterExecutor:YARN session job模式 KubernetesSessionClusterExecutor:K8S session job模式 EmbeddedExecutor:Application模式用
这里采用的是YarnJobClusterExecutor,如何找?
6. 执行Pipeline:先构建JobGraph,再找到匹配的ClusterDescriptor来部署flink集群以执行JobGraph。
注: StreamGraph到JobGraph主要变化 node: ( List<StreamEdge> -> StreamNode -> List<StreamEdge> ) --> node --> node ... 到 node: ( List<JobEdge> -> JobVertex -> List<IntermediateDataSet> ) --> node --> node ... 的转换,另外完成Chaining,即多个StreamNode会合并为一个JobVertex
7. 通过ClusterDescriptor部署flink集群来执行JobGraph。
注: HDFS目录由yarn.staging-directory参数指定 flink-dist, lib/, plugins/ 这些多个flink应用都用到的,预先上传到yarn.provided.lib.dirs参数指定的HDFS目录即可,NM会缓存避免频繁上传下载。
1. bin/flink run
2. CliFrontend.java
main(String[] args)
CliFrontend cli = new CliFrontend(configuration, customCommandLines)
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args))
run(params)
executeProgram(Configuration, PackagedProgram)
ClientUtils.executeProgram
ClientUtils.java
executeProgram
program.invokeInteractiveModeForExecution
3. callMainMethod(entryClass, args)
entryClass.getMethod("main", String[].class).invoke(null, (Object) args)
4. 用户程序入口类
main(args)
5. StreamExecutionEnvironment.java
executeAsync(StreamGraph)
executorFactory.getExecutor(configuration).execute(streamGraph, configuration, userClassloader)
6. AbstractJobClusterExecutor.java
execute
{
JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration)
clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)
clusterSpecification = clusterClientFactory.getClusterSpecification(configuration)
clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, detached)
}
7. YarnClusterDescriptor.java
ClusterClientProvider<ApplicationId> deployJobCluster(ClusterSpecification, JobGraph, detached)
deployInternal
{
1) 检查用户权限(Kerberos证书)、vcores、hadoop环境变量、yarn queue等
2) ApplicationReport report = startAppMaster(...)
{
1. 上传资源到HDFS(yarnShipFiles、yarn.ship-archives、pipeline.jars、配置文件等)
2. 将JobGraph序列化到文件中,并上传到HDFS(per job模式才有)
3. 设置集群HA信息
4. 设置ApplicationSubmissionContext
setApplicationName
setApplicationType
setAMContainerSpec // Application Master Container
setCommands // 通过class参数指定了yarn AM container启动类为YarnJobClusterEntrypoint
setTokens
setLocalResources(job.graph、flink-conf.yaml、yarn-site.xml、krb5.conf、keytab、kerberos)
setEnvironment
flink-conf.yaml中用户配置的以containerized.master.env.为前缀的变量
_FLINK_CLASSPATH // Flink app class path
_FLINK_DIST_JAR // local path of flink dist jar
classpath from YARN configuration
...
setResource(masterMemoryMB、yarn.appmaster.vcores)
setPriority
setQueue // yarn queue
setApplicationNodeLabel
setApplicationTags
5. 提交yarn application: yarnClient.submitApplication(appContext)
// RUNNING或FINISHED状态时,正常退出循环;FAILED或KILLED时抛异常退出;状态变化时打印日志,运行超60秒打印日志
6. loop循环等待提交结果: ApplicationReport report = yarnClient.getApplicationReport(appId)
}
3) 从ApplicationReport中获取rpc和rest的地址和端口信息以及ApplicationId信息,设置回Configuration;设置ClusterId=ApplicationId信息到HA中。
4) 返回RestClusterClient(Configuration, ApplicationId)
}
JobManager进程就是YARN job中的ApplicationMaster(AM)。
YARN NodeManager接收到YARN RM发送的AM container启动请求后为其设置好运行环境(环境变量、JAR包、配置文件、Cgroup资源限制等),将启动命令写到脚本文件中,运行脚本启动Container(JobManager进程)。
后续操作如下图所示:
1. 启动RpcService,内部创建了ActorSystem。
Flink集群内RPC通信是封装了Akka Actor来实现。
ActorSystem会创建一个Supervisor Actor,用来创建并启动其他的Actor,比如ResourceManager、Dispatcher、JobMaster。
2. 创建并启动WebMonitorEndpoint
这是一个借助netty实现的Rest服务,用来响应web请求。
ApplicationMaster启动后向YARN RM注册,注册的appTrackingUrl就是这个Web服务的地址,这样就可从YARN资源管理页面跳转到Flink Web UI页面。
3. 创建并启动ResourceManager。
4. 创建并启动Dispatcher。
Dispatcher是一个RPC服务,可以接收RPC请求。RPC服务的创建与服务过程与ResourceManager一样,不再多述。
5. Dispatcher创建并启动JobMaster。
YarnJobClusterEntrypoint.java
main(String[] args)
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint)
clusterEntrypoint.startCluster()
runCluster(Configuration, PluginManager)
initializeServices
{
1. commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService
JMXService
IO线程池
HA Service
BlobServer
HeartbeatServices
MetricRegistry
MetricQuery RpcService
ProcessMetricGroup
ExecutionGraphInfoStore
}
clusterComponent = dispatcherResourceManagerComponentFactory.create
DefaultDispatcherResourceManagerComponentFactory.java
create(...)
{
2. webMonitorEndpoint = restEndpointFactory.createRestEndpoint
webMonitorEndpoint.start()
3. resourceManagerService = ResourceManagerServiceImpl.create
4. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner
{
...
Dispatcher dispatcher = dispatcherFactory.createDispatcher
dispatcher.start()
{
...
5. startRecoveredJobs()
{
for (JobGraph recoveredJob : recoveredJobs){
...
JobMaster jobMaster = new JobMaster(...)
jobMaster.start()
}
}
}
}
resourceManagerService.start()
{
...
YarnResourceManagerDriver.java
initializeInternal()
{
...
// 创建并启动AMRMClientAsync,联系YARN RM
resourceManagerClient = yarnResourceManagerClientFactory.createResourceManagerClient
resourceManagerClient.init(yarnConfig)
resourceManagerClient.start()
// AM启动后向YARN RM注册自己,这样可以通过YARN跳转到Flink web ui页面
resourceManagerClient.registerApplicationMaster(rpcAddress, restPort, webInterfaceUrl)
...
// 创建并启动NMClientAsync,用于联系YARN NM
nodeManagerClient = yarnNodeManagerClientFactory.createNodeManagerClient
nodeManagerClient.init(yarnConfig)
nodeManagerClient.start()
}
YarnContainerEventHandler.onContainersAllocated(List<Container> containers)
{
...
// 运行任务所需的Container都申请并领取完毕后,AM维护与YARN RM心跳
// resourceManagerClient = AMRMClientAsync
if (getNumRequestedNotAllocatedWorkers() <= 0) {
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
}
}
}
1. JobMaster通过SchedulerNG执行JobGraph的调度。
调度分两步走:
1) 获取资源(Task运行的容器,即Flink中的Slot,Slot需要从Container中划分)
2) 调度任务(Task)
注:这里仅说第1步,即TaskManager进程如何被调度起来为Task运行提供slots资源支持。第2步在下个问题描述。
2. SlotPoolService负责slots资源申请,先从缓存中(内存)检查是否有可用的slots资源,有的话直接分配,没的话会向Flink RM发送RPC请求。
AllocatedSlotPool中缓存有已经注册的slots资源:Map<AllocationID, AllocatedSlot> registeredSlots。
AllocatedSlot属性:AllocationID、TaskManagerLocation、TaskManagerGateway、ResourceProfile、physicalSlotNumber。
3. Flink RM接收到JobMaster发送的RPC资源请求,会将处理交SlotManager,SlotManager又通过ResourceManagerDriver来做具体的资源申请。
YarnResourceManagerDriver内部有YARN RM Client和YARN NM Client。
1) 通过YARN RM Client请求YARN RM分配containers。
2) 通过YARN NM Client向YARN NM发送请求,创建container运行TaskManager进程(指定了TaskManager进程入口类为YarnTaskExecutorRunner)。
JobMaster.java
new JobMaster
{
...
// jobmanager.scheduler 默认值为Ng,因此创建的SchedulerNG为DefaultScheduler
this.schedulerNG = createScheduler(...)
{
// JobGraph -> ExecutionGraph
this.executionGraph = createAndRestoreExecutionGraph(...)
}
}
jobMaster.start
...
onStart
startJobExecution
startJobMasterServices
startScheduling
1. schedulerNG.startScheduling
DefaultScheduler.java
// 方法体在父类SchedulerBase.java中
startScheduling
startSchedulingInternal
schedulingStrategy.startScheduling
PipelinedRegionSchedulingStrategy.java
startScheduling
maybeScheduleRegions(Set<SchedulingPipelinedRegion> regions)
// 遍历regions,按region调度
maybeScheduleRegion
schedulerOperations.allocateSlotsAndDeploy(List<ExecutionVertexDeploymentOption>)
DefaultScheduler.java
allocateSlotsAndDeploy
allocateSlots
executionSlotAllocator.allocateSlotsFor(List<ExecutionVertexID> executionVertexIds)
SlotSharingExecutionSlotAllocator.java
allocateSlotsFor
getOrAllocateSharedSlot
slotProvider.allocatePhysicalSlot(PhysicalSlotRequest)
PhysicalSlotProviderImpl.java
allocatePhysicalSlot
// 先看有可用的slot没,有的话直接分配
tryAllocateFromAvailable
// 没的话请求Flink RM获取
orElseGet(requestNewSlot(...))
2. slotPool.requestNewAllocatedSlot(SlotRequestId, ResourceProfile, timeout)
DeclarativeSlotPoolBridge.java
requestNewAllocatedSlot
internalRequestNewSlot
internalRequestNewAllocatedSlot
getDeclarativeSlotPool().increaseResourceRequirementsBy
DefaultDeclarativeSlotPool.java
increaseResourceRequirementsBy
declareResourceRequirements
notifyNewResourceRequirements.accept(Collection<ResourceRequirement>)
DeclarativeSlotPoolService.java
declareResourceRequirements(Collection<ResourceRequirement>)
resourceRequirementServiceConnectionManager.declareResourceRequirements
DefaultDeclareResourceRequirementServiceConnectionManager.java
declareResourceRequirements(ResourceRequirements)
triggerResourceRequirementsSubmission
sendResourceRequirements
service.declareResourceRequirements
DeclarativeSlotPoolService.java
// 向Flink RM发送RPC请求,获取slots资源
resourceManagerGateway.declareRequiredResources(JobMasterId, ResourceRequirements, Time timeout)
// Flink ResourceManager服务端处理资源请求
ResourceManager.java
declareRequiredResources
3. slotManager.processResourceRequirements
DeclarativeSlotManager.java
processResourceRequirements
resourceTracker.notifyResourceRequirements
checkResourceRequirements()
tryFulfillRequirementsWithPendingSlots(JobID jobId, Collection<Map.Entry<ResourceProfile, Integer>> missingResources, ResourceCounter pendingSlots)
// 遍历missingResource
tryAllocateWorkerAndReserveSlot(ResourceProfile profile, ResourceCounter pendingSlots)
taskExecutorManager.allocateWorker(profile)
TaskExecutorManager.java
allocateWorker
resourceActions.allocateResource(WorkerResourceSpec)
ResourceManager.java
ResourceActionsImpl.allocateResource
startNewWorker
ActiveResourceManager.java
startNewWorker
requestNewWorker
resourceManagerDriver.requestResource(TaskExecutorProcessSpec)
YarnResourceManagerDriver.java
requestResource
// 请求获取container资源
3.1 resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority))
YarnResourceManagerDriver.java
// container里有NM的地址信息
YarnContainerEventHandler.onContainersAllocated(List<Container> containers)
onContainersOfPriorityAllocated
// 遍历containers
startTaskExecutorInContainerAsync
// 创建ContainerLaunchContext请求对象
context = createTaskExecutorLaunchContext(ResourceID containerId, String host, TaskExecutorProcessSpec taskExecutorProcessSpec)
// 通过YARN NM Client发送请求,启动container运行TaskManager进程
3.2 nodeManagerClient.startContainerAsync(container, context)
提示:Task运行过程中,接收上游发过来的数据,处理完发往下游,由下游Task继续处理,这期间数据的存取由TaskSlot中的MemoryManager控制,相较于Java的堆来说能有效控制内存使用限额,缩减数据占用内存的大小,及时回收内存,这就是Flink的内存管理。
DefaultScheduler.java
allocateSlotsAndDeploy
// 请求获取slots资源
allocateSlots
// 部署Task
waitForAllSlotsAndDeploy
deployAll(List<DeploymentHandle> deploymentHandles)
// 遍历deploymentHandles
deployOrHandleError(DeploymentHandle)
deployTaskSafe
executionVertexOperations.deploy(ExecutionVertex)
DefaultExecutionVertexOperations.java
deploy(ExecutionVertex executionVertex)
executionVertex.deploy()
ExecutionVertex.java
deploy()
currentExecution.deploy()
Execution.java
deploy()
TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway()
taskManagerGateway.submitTask(TaskDeploymentDescriptor, rpcTimeout)
RpcTaskManagerGateway.java
submitTask
taskExecutorGateway.submitTask(TaskDeploymentDescriptor, jobMasterId, timeout)
// RPC服务端响应
TaskExecutor.java
submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout)
{
...
// Task implements Runnable
Task task = new Task(...)
taskSlotTable.addTask(task)
task.startTaskThread()
// executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask)
executingThread.start()
}
Task.java
run()
doRun()
{
...
TaskInvokable invokable = loadAndInstantiateInvokable(...)
{
...
invokableClass = Class.forName(className, true, classLoader).asSubclass(TaskInvokable.class)
invokableClass.getConstructor(Environment.class).newInstance(environment)
}
restoreAndInvoke(invokable)
...
}
ResourceManager创建与启动过程涉及Leader选举、代理类、Akka Actor,代码跳转比较绕,故这里把完整调用链描述一下,有兴趣可阅读。
整个过程总结下来就是
start() -> onStart()
1) ResourceManager将start的处理交由代理对象RpcServer(AkkaInvocationHandler实例)的start方法处理
2) RpcServer invoke方法被调用,发现是非RPC消息,就调用自身start方法
3) start通过ResourceManager Actor的引用ActorRef向ResourceManager Actor发送start类型控制消息
4) Actor(AkkaRpcActor)收到消息,将处理交由RpcEndpoint,即ResourceManager处理
onStart() 方法内部启动相关服务。
调用链用如下
// 1) 启动leader选举服务,选举leader
// start(LeaderContender) -> leaderContender.grantLeadership
{
// implements ResourceManagerService, LeaderContender
ResourceManagerServiceImpl.java
start()
leaderElectionService.start(this)
// HA leader选举
DefaultLeaderElectionService.java
start(LeaderContender)
leaderElectionDriver = leaderElectionDriverFactory.createLeaderElectionDriver
ZooKeeperLeaderElectionDriverFactory.java
createLeaderElectionDriver
new ZooKeeperLeaderElectionDriver
{
// latchPath节点用于leader选举
leaderLatch = new LeaderLatch(client, checkNotNull(latchPath))
leaderLatch.addListener(this)
leaderLatch.start()
// leaderPath节点用于存储leader信息,监听该节点数据的变化
cache = new NodeCache(client, leaderPath)
cache.getListenable().addListener(this)
cache.start()
}
ZooKeeperLeaderElectionDriver.java
// 成为leader时,latchPath上挂的监听会被回调,isLeader方法被执行
isLeader()
leaderElectionEventHandler.onGrantLeadership
DefaultLeaderElectionService.java
onGrantLeadership
leaderContender.grantLeadership
}
// 2) 成为leader后,创建ResourceManager并启动RPC服务
{
ResourceManagerServiceImpl.java
grantLeadership
startNewLeaderResourceManager
// resourceManagerFactory = ActiveResourceManagerFactory
this.leaderResourceManager = resourceManagerFactory.createResourceManager
{
new ActiveResourceManager
{
...
// 启动RPC服务
this.rpcServer = rpcService.startServer(this)
}
}
startResourceManagerIfIsLeader
resourceManager.start()
}
// 3) start() -> onStart()
{
// ResourceManager extends RpcEndpoint
RpcEndpoint.java
start()
rpcServer.start()
// AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer
AkkaInvocationHandler.java(客户端)
// rpcServer是JDK Proxy生成的代理对象,实现了InvocationHandler接口的invoke方法,故start方法交由代理对象的invoke执行
invoke(Object proxy, Method method, Object[] args)
// if(非rpc方法) 调用对象自身相应的method处理
result = method.invoke(this, args)
start()
// rpcEndpoint是Actor的引用:ActorRef,可以用来向Actor发消息,这里是向自身ResourceManager发送控制类消息
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender())
// AkkaRpcActor extends AbstractActor,是一个Actor
AkkaRpcActor.java(服务端)
handleControlMessage(ControlMessages)
// 初始状态为STOPPED,即state = AkkaRpcActor.StoppedState
state = state.start(this)
akkaRpcActor.rpcEndpoint.internalCallOnStart()
RpcEndpoint.java
internalCallOnStart()
onStart()
}
// 4) 启动ResourceManager下相关服务:心跳管理服务、SlotManager等
{
ResourceManager.java
onStart()
startResourceManagerServices()
...
startHeartbeatServices()
slotManager.start
}
// 入口 dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner
// 1) 创建DispatcherRunner,启动HA leader选举
{
DefaultDispatcherRunnerFactory.java
createDispatcherRunner
DefaultDispatcherRunner.create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
new DispatcherRunnerLeaderElectionLifecycleManager
// 启动leader选举服务,
// 后续 start(LeaderContender) -> leaderContender.grantLeadership过程同上述ResourceManager调用链一致,不再多述
leaderElectionService.start(dispatcherRunner)
}
// 2) 成为leader后,启动Dispatcher(内部启动了RPC服务)
{
DefaultDispatcherRunner.java
grantLeadership
startNewDispatcherLeaderProcess
dispatcherLeaderProcess = createNewDispatcherLeaderProcess
dispatcherLeaderProcess.start
AbstractDispatcherLeaderProcess.java
start
startInternal
onStart
JobDispatcherLeaderProcess.java
onStart
dispatcherGatewayServiceFactory.create
DefaultDispatcherGatewayServiceFactory.java
create(DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter)
{
Dispatcher dispatcher = dispatcherFactory.createDispatcher
// JobDispatcherFactory.java
createDispatcher
// MiniDispatcher extends Dispatcher
new MiniDispatcher
{
// 启动RPC服务,内部会创建Actor,返回代理对象AkkaInvocationHandler
this.rpcServer = rpcService.startServer(this)
}
dispatcher.start
}
}
// 3) 启动后做什么?startRecoveredJobs(start JobMasters)
{
Dispatcher.java
start()
.. // start() -> onStart() 过程,同上述ResourceManager调用链一致,不再多述
onStart()
startDispatcherServices
startRecoveredJobs
// 遍历recoveredJobs
runRecoveredJob(recoveredJob)
runJob
createJobManagerRunner
JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner
runner.start()
// implements LeaderContender
JobMasterServiceLeadershipRunner.java
start
leaderElectionService.start(this)
... // 后续 start(LeaderContender) -> leaderContender.grantLeadership过程同上述ResourceManager调用链一致,不再多述
grantLeadership
startJobMasterServiceProcessAsync
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
createNewJobMasterServiceProcess
jobMasterServiceProcess = jobMasterServiceProcessFactory.create
DefaultJobMasterServiceProcessFactory.java
create
new DefaultJobMasterServiceProcess
this.jobMasterServiceFuture = jobMasterServiceFactory.createJobMasterService
DefaultJobMasterServiceFactory.java
createJobMasterService
internalCreateJobMasterService
JobMaster jobMaster = new JobMaster(...)
jobMaster.start()
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。