Azkaban是一个非常简单实用,而且开源的作业调度系统。在2.x版本中不支持集群模式部署,在3.x版本中支持集群模式部署,适用于作业量比较大一些的应用场景。有关Azkaban更多详细信息,如特点、功能、特性、作业定义等,可以参考官方文档,这里不再详述。
Azkaban集群架构
下面我们看一下Azkaban集群模式的架构,如下图所示:
从上图可见,Azkaban集群部署模式,主要有3个核心的组件:
Azkaban WebServer,是整个调度集群的核心,负责所有作业的管理和调度。
Azkaban ExecutorServer,整个调度集群中实际运行作业的节点,该类节点可能是作为一个作业提交的客户端,比如Spark on YARN部署模式下,cluster运行模式时只作为客户端使用,client运行模式时会有部分计算逻辑;比如普通的Java程序需要处理量级较小的数据作业,这时Executor Server节点可能有较大的工作负载,占用较多节点资源(内存、CPU)。
DB,是集群中所有节点运行共用的数据存储,包含作业信息、各种调度元数据等等。
核心调度概述
Azkaban WebServer需要根据Executor Server的运行状态信息,选择一个合适的Executor Server来运行WorkFlow,然后会将提交到队列中的WorkFlow调度到选定的Executor Server上运行。我们整理了与核心调度相关的各个组件,主要包括Azkaban WebServer端和Azkaban ExecutorServer端,他们之间的关系如下图所示:
其实,从调度层面来看,Azkaban WebServer与Executor Server之间的交互方式非常简单,是通过REST API的方式来进行交互,基本的模式是,Azkaban WebServer根据调度的需要,主动调用Executor Server暴露的REST API来获取相应的资源信息,比如Executor Server的状态信息、分配WorkFlow到指定Executor Server上运行,等等。 我们可以在QueueProcessorThread.selectExecutorAndDispatchFlow()方法中看到,选择Executor Server并进行调度的实现,代码片段如下所示:
1 final Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
2if (selectedExecutor != null) {
3 try {
4 dispatch(reference, exflow, selectedExecutor);
5 ExecutorManager.this.commonMetrics.markDispatchSuccess();
6 } catch (final ExecutorManagerException e) {
7 ExecutorManager.this.commonMetrics.markDispatchFail();
8 logger.warn(String.format(
9 "Executor %s responded with exception for exec: %d",
10 selectedExecutor, exflow.getExecutionId()), e);
11 handleDispatchExceptionCase(reference, exflow, selectedExecutor,
12 availableExecutors);
13 }
14}
QueueProcessorThread是运行在Azkaban WebServer端的一个线程,它在ExecutorManager中定义,是内部调度中最核心的线程。selectExecutor()方法处理如何选择一个合适的Executor Server,然后通过dispatch()方法将需要运行的WorkFlow调度到该Executor Server上运行。
选择Executor Server
Azkaban WebServer选择Executor,调用selectExecutor()方法,实现如下所示:
1private Executor selectExecutor(final ExecutableFlow exflow,
2 final Set<Executor> availableExecutors) {
3 Executor choosenExecutor =
4 getUserSpecifiedExecutor(exflow.getExecutionOptions(),
5 exflow.getExecutionId());
6 // If no executor was specified by admin
7 if (choosenExecutor == null) {
8 logger.info("Using dispatcher for execution id :"
9 + exflow.getExecutionId());
10 final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,
11 ExecutorManager.this.comparatorWeightsMap);
12 choosenExecutor = selector.getBest(availableExecutors, exflow);
13 }
14 return choosenExecutor;
15}
首先,查看当前exflow的配置中,是否要求将该exflow调度到指定的Executor Server上运行,如果是的话,则会返回该指定的Executor Server的信息,后续直接调度到该Executor Server上;否则会按照一定的计算规则去选出一个Executor Server。
在创建ExecutorSelector时,传入参数值ExecutorManager.this.filterList,该filterList是从azkanban.properties文件中读取azkaban.executorselector.filters的配置值,并创建了一个ExecutorFilter对象,而该对象中包含了一组FactorFilter,后面我们会说明。 使用ExecutorSelector来选出一个Executor Server,具体选择的逻辑,我们可以查看ExecutorSelector.getBest()方法。 首先通过定义的CandidateFilter(它是一个抽象类,具体实现类为ExecutorFilter)进行预筛选:
1for (final K candidateInfo : candidateList) {
2 if (this.filter.filterTarget(candidateInfo, dispatchingObject)) {
3 filteredList.add(candidateInfo);
4 }
5}
上面的filter就是FactorFilter类的实例,Azkaban内部定义了如下3种:
1private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
2private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimumFreeMemory";
3private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";
目前3.40.0版本不支持自定义,只能使用内建实现的,如果需要增加新的FactorFilter,可以在此基础上做一个简单改造,配置使用自己定义的FactorFilter实现。FactorFilter是一个泛型类:FactorFilter<Executor, ExecutableFlow>,根据上面定义的3种指标对Executor Server进行一个预过滤,满足要求的会进行后面的比较,加入到调度WorkFlow执行的Executor Server的候选集中。 然后,通过如下方式进行比较排序,选择合适的Executor Server:
1// final work - find the best candidate from the filtered list.
2final K executor = Collections.max(filteredList, this.comparator);
3logger.debug(String.format("candidate selected %s",
4 null == executor ? "(null)" : executor.toString()));
5return executor;
这里关键的就是this.comparator,它有一个实现类ExecutorComparator,该类中给出了需要对两个Executor Server的哪些指标进行综合比较,亦即一组比较器的定义,可以看到目前考虑了4种比较器:
1private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
2private static final String MEMORY_COMPARATOR_NAME = "Memory";
3private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
4private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
通过上面代码可以看出,在选择调度一个WorkFlow到Azkaban集群中的某个Executor Server时,需要比较Executor Server的如下4个指标:
基于上面4个指标,创建了4个比较器,使用FactorComparator来表示,对需要比较的一组Executor Server,使用这4个比较器进行比较,通过加权后得到一个得分值,根据该得分值选定Executor Server,核心逻辑如下所示:
1final Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
2for (final FactorComparator<T> comparator : comparatorList) {
3 final int result = comparator.compare(object1, object2);
4 result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
5 result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
6 logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
7 comparator.getFactorName(), result, result1, result2));
8}
上面选取了待比较的两个Executor Server都不为空的情况,分别遍历每个FactorComparator进行比较,在分别对每个Executor Server的比较结果值进行累加求和,加权得到一个分数值。从一组Executor Server中,根据最终比较的分数值,分数值最大的Executor Server为最终选定的Executor Server。
获取Executor Server的运行统计信息
在Azkaban WebServer内部,会维护集群中每个Executor Server的运行状态信息,该信息的获取是在QueueProcessorThread线程中实现的,定期去更新所维护的Executor Server的运行状态信息,如下所示:
1if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
2 || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
3 // Refresh executorInfo for all activeExecutors
4 refreshExecutors();
5 lastExecutorRefreshTime = currentTime;
6 currentContinuousFlowProcessed = 0;
7}
上面refreshExecutors()方法遍历内存中维护的所有的Executor Server,调用每个Executor Server的/serverStatistics接口,拉取Executor Server的运行状态信息。 另外,Azkaban WebServer还需要能够获取到各个Executor Server上运行的WorkFlow的状态信息,可以在ExecutorManager.ExecutingManagerUpdaterThread中看到实现,代码片段如下所示:
1results =
2 ExecutorManager.this.apiGateway.callWithExecutionId(executor.getHost(),
3 executor.getPort(), ConnectorParams.UPDATE_ACTION,
4 null, null, executionIds, updateTimes);
上面调用Executor Server的/executor?action=update接口来拉取WorkFlow的状态信息,然后更新内存中维护的状态信息数据结构。其中,有些WorkFlow可能已经运行完成,需要释放资源;有些WorkFlow状态发生变更,也需要更新Azkaban WebServer端内存中维护的数据结构。
调度WorkFlow到Executor Server上执行
上面已经选定Executor Server,结合前面代码,是通过调用ExecutorManager.dispatch()方法来实现,调度WorkFlow到该选定的Executor Server上运行,代码片段如下所示:
1try {
2 this.apiGateway.callWithExecutable(exflow, choosenExecutor,
3 ConnectorParams.EXECUTE_ACTION);
4} catch (final ExecutorManagerException ex) {
5 logger.error("Rolling back executor assignment for execution id:"
6 + exflow.getExecutionId(), ex);
7 this.executorLoader.unassignExecutor(exflow.getExecutionId());
8 throw new ExecutorManagerException(ex);
9}
通过跟踪查看apiGateway.callWithExecutable()实现,可以看到,最终是调用了Executor Server端的一个REST API接口:/executor,然后带上相关的请求参数,如action=execute、execId等。
Executor Server执行WorkFlow
很显然,Azkaban WebServer调度WorkFlow后,Executor Server在ExecutorServlet中接收到对应的请求,核心方法如下所示:
1private void handleAjaxExecute(final HttpServletRequest req,
2 final Map<String, Object> respMap, final int execId) throws ServletException {
3 try {
4 this.flowRunnerManager.submitFlow(execId);
5 } catch (final ExecutorManagerException e) {
6 e.printStackTrace();
7 logger.error(e.getMessage(), e);
8 respMap.put(RESPONSE_ERROR, e.getMessage());
9 }
10}
在收到Azkaban WebServer的调度请求后,Executor Server使用内部的FlowRunnerManager来提交WorkFlow执行。在这个过程中,首先使用ExecutorLoader从数据库中读取WorkFlow对应的信息;然后使用FlowPreparer进行初始化,创建对应的数据目录等;最后创建FlowRunner来执行WorkFlow,并跟踪其执行状态。
参考内容
✦ ✦ ✦ ✦ ✦ ✦ ✦ ✦
作者: Yanjun 原文:http://shiyanjun.cn/archives/1820.html