首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Azkaban集群内部调度原理分析

Azkaban集群内部调度原理分析

作者头像
用户1257215
发布于 2018-07-27 01:49:41
发布于 2018-07-27 01:49:41
2.6K00
代码可运行
举报
文章被收录于专栏:架构师之旅架构师之旅
运行总次数:0
代码可运行

Azkaban是一个非常简单实用,而且开源的作业调度系统。在2.x版本中不支持集群模式部署,在3.x版本中支持集群模式部署,适用于作业量比较大一些的应用场景。有关Azkaban更多详细信息,如特点、功能、特性、作业定义等,可以参考官方文档,这里不再详述。

Azkaban集群架构

下面我们看一下Azkaban集群模式的架构,如下图所示:

从上图可见,Azkaban集群部署模式,主要有3个核心的组件:

  • Azkaban WebServer

Azkaban WebServer,是整个调度集群的核心,负责所有作业的管理和调度。

  • Azkaban ExecutorServer

Azkaban ExecutorServer,整个调度集群中实际运行作业的节点,该类节点可能是作为一个作业提交的客户端,比如Spark on YARN部署模式下,cluster运行模式时只作为客户端使用,client运行模式时会有部分计算逻辑;比如普通的Java程序需要处理量级较小的数据作业,这时Executor Server节点可能有较大的工作负载,占用较多节点资源(内存、CPU)。

  • DB

DB,是集群中所有节点运行共用的数据存储,包含作业信息、各种调度元数据等等。

核心调度概述

Azkaban WebServer需要根据Executor Server的运行状态信息,选择一个合适的Executor Server来运行WorkFlow,然后会将提交到队列中的WorkFlow调度到选定的Executor Server上运行。我们整理了与核心调度相关的各个组件,主要包括Azkaban WebServer端和Azkaban ExecutorServer端,他们之间的关系如下图所示:

其实,从调度层面来看,Azkaban WebServer与Executor Server之间的交互方式非常简单,是通过REST API的方式来进行交互,基本的模式是,Azkaban WebServer根据调度的需要,主动调用Executor Server暴露的REST API来获取相应的资源信息,比如Executor Server的状态信息、分配WorkFlow到指定Executor Server上运行,等等。 我们可以在QueueProcessorThread.selectExecutorAndDispatchFlow()方法中看到,选择Executor Server并进行调度的实现,代码片段如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 1 final Executor selectedExecutor = selectExecutor(exflow, availableExecutors);
 2if (selectedExecutor != null) {
 3  try {
 4    dispatch(reference, exflow, selectedExecutor);
 5    ExecutorManager.this.commonMetrics.markDispatchSuccess();
 6  } catch (final ExecutorManagerException e) {
 7    ExecutorManager.this.commonMetrics.markDispatchFail();
 8    logger.warn(String.format(
 9        "Executor %s responded with exception for exec: %d",
10        selectedExecutor, exflow.getExecutionId()), e);
11    handleDispatchExceptionCase(reference, exflow, selectedExecutor,
12        availableExecutors);
13  }
14}

QueueProcessorThread是运行在Azkaban WebServer端的一个线程,它在ExecutorManager中定义,是内部调度中最核心的线程。selectExecutor()方法处理如何选择一个合适的Executor Server,然后通过dispatch()方法将需要运行的WorkFlow调度到该Executor Server上运行。

选择Executor Server

Azkaban WebServer选择Executor,调用selectExecutor()方法,实现如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 1private Executor selectExecutor(final ExecutableFlow exflow,
 2    final Set<Executor> availableExecutors) {
 3  Executor choosenExecutor =
 4      getUserSpecifiedExecutor(exflow.getExecutionOptions(),
 5          exflow.getExecutionId());
 6  // If no executor was specified by admin
 7  if (choosenExecutor == null) {
 8    logger.info("Using dispatcher for execution id :"
 9        + exflow.getExecutionId());
10    final ExecutorSelector selector = new ExecutorSelector(ExecutorManager.this.filterList,
11        ExecutorManager.this.comparatorWeightsMap);
12    choosenExecutor = selector.getBest(availableExecutors, exflow);
13  }
14  return choosenExecutor;
15}

首先,查看当前exflow的配置中,是否要求将该exflow调度到指定的Executor Server上运行,如果是的话,则会返回该指定的Executor Server的信息,后续直接调度到该Executor Server上;否则会按照一定的计算规则去选出一个Executor Server。

在创建ExecutorSelector时,传入参数值ExecutorManager.this.filterList,该filterList是从azkanban.properties文件中读取azkaban.executorselector.filters的配置值,并创建了一个ExecutorFilter对象,而该对象中包含了一组FactorFilter,后面我们会说明。 使用ExecutorSelector来选出一个Executor Server,具体选择的逻辑,我们可以查看ExecutorSelector.getBest()方法。 首先通过定义的CandidateFilter(它是一个抽象类,具体实现类为ExecutorFilter)进行预筛选:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1for (final K candidateInfo : candidateList) {
2  if (this.filter.filterTarget(candidateInfo, dispatchingObject)) {
3    filteredList.add(candidateInfo);
4  }
5}

上面的filter就是FactorFilter类的实例,Azkaban内部定义了如下3种:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1private static final String STATICREMAININGFLOWSIZE_FILTER_NAME = "StaticRemainingFlowSize";
2private static final String MINIMUMFREEMEMORY_FILTER_NAME = "MinimumFreeMemory";
3private static final String CPUSTATUS_FILTER_NAME = "CpuStatus";

目前3.40.0版本不支持自定义,只能使用内建实现的,如果需要增加新的FactorFilter,可以在此基础上做一个简单改造,配置使用自己定义的FactorFilter实现。FactorFilter是一个泛型类:FactorFilter<Executor, ExecutableFlow>,根据上面定义的3种指标对Executor Server进行一个预过滤,满足要求的会进行后面的比较,加入到调度WorkFlow执行的Executor Server的候选集中。 然后,通过如下方式进行比较排序,选择合适的Executor Server:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1// final work - find the best candidate from the filtered list.
2final K executor = Collections.max(filteredList, this.comparator);
3logger.debug(String.format("candidate selected %s",
4    null == executor ? "(null)" : executor.toString()));
5return executor;

这里关键的就是this.comparator,它有一个实现类ExecutorComparator,该类中给出了需要对两个Executor Server的哪些指标进行综合比较,亦即一组比较器的定义,可以看到目前考虑了4种比较器:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
2private static final String MEMORY_COMPARATOR_NAME = "Memory";
3private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
4private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";

通过上面代码可以看出,在选择调度一个WorkFlow到Azkaban集群中的某个Executor Server时,需要比较Executor Server的如下4个指标:

  1. 能够运行WorkFlow的剩余容量,数值越大越优先
  2. 剩余内存用量,数值越大越优先
  3. 最近分配Flow的时间,数值越大越优先
  4. CPU使用用量,数值越小越优先

基于上面4个指标,创建了4个比较器,使用FactorComparator来表示,对需要比较的一组Executor Server,使用这4个比较器进行比较,通过加权后得到一个得分值,根据该得分值选定Executor Server,核心逻辑如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1final Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
2for (final FactorComparator<T> comparator : comparatorList) {
3  final int result = comparator.compare(object1, object2);
4  result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
5  result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
6  logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
7      comparator.getFactorName(), result, result1, result2));
8}

上面选取了待比较的两个Executor Server都不为空的情况,分别遍历每个FactorComparator进行比较,在分别对每个Executor Server的比较结果值进行累加求和,加权得到一个分数值。从一组Executor Server中,根据最终比较的分数值,分数值最大的Executor Server为最终选定的Executor Server。

获取Executor Server的运行统计信息

在Azkaban WebServer内部,会维护集群中每个Executor Server的运行状态信息,该信息的获取是在QueueProcessorThread线程中实现的,定期去更新所维护的Executor Server的运行状态信息,如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
2    || currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
3  // Refresh executorInfo for all activeExecutors
4  refreshExecutors();
5  lastExecutorRefreshTime = currentTime;
6  currentContinuousFlowProcessed = 0;
7}

上面refreshExecutors()方法遍历内存中维护的所有的Executor Server,调用每个Executor Server的/serverStatistics接口,拉取Executor Server的运行状态信息。 另外,Azkaban WebServer还需要能够获取到各个Executor Server上运行的WorkFlow的状态信息,可以在ExecutorManager.ExecutingManagerUpdaterThread中看到实现,代码片段如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1results =
2    ExecutorManager.this.apiGateway.callWithExecutionId(executor.getHost(),
3        executor.getPort(), ConnectorParams.UPDATE_ACTION,
4        null, null, executionIds, updateTimes);

上面调用Executor Server的/executor?action=update接口来拉取WorkFlow的状态信息,然后更新内存中维护的状态信息数据结构。其中,有些WorkFlow可能已经运行完成,需要释放资源;有些WorkFlow状态发生变更,也需要更新Azkaban WebServer端内存中维护的数据结构。

调度WorkFlow到Executor Server上执行

上面已经选定Executor Server,结合前面代码,是通过调用ExecutorManager.dispatch()方法来实现,调度WorkFlow到该选定的Executor Server上运行,代码片段如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1try {
2  this.apiGateway.callWithExecutable(exflow, choosenExecutor,
3      ConnectorParams.EXECUTE_ACTION);
4} catch (final ExecutorManagerException ex) {
5  logger.error("Rolling back executor assignment for execution id:"
6      + exflow.getExecutionId(), ex);
7  this.executorLoader.unassignExecutor(exflow.getExecutionId());
8  throw new ExecutorManagerException(ex);
9}

通过跟踪查看apiGateway.callWithExecutable()实现,可以看到,最终是调用了Executor Server端的一个REST API接口:/executor,然后带上相关的请求参数,如action=execute、execId等。

Executor Server执行WorkFlow

很显然,Azkaban WebServer调度WorkFlow后,Executor Server在ExecutorServlet中接收到对应的请求,核心方法如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 1private void handleAjaxExecute(final HttpServletRequest req,
 2    final Map<String, Object> respMap, final int execId) throws ServletException {
 3  try {
 4    this.flowRunnerManager.submitFlow(execId);
 5  } catch (final ExecutorManagerException e) {
 6    e.printStackTrace();
 7    logger.error(e.getMessage(), e);
 8    respMap.put(RESPONSE_ERROR, e.getMessage());
 9  }
10}

在收到Azkaban WebServer的调度请求后,Executor Server使用内部的FlowRunnerManager来提交WorkFlow执行。在这个过程中,首先使用ExecutorLoader从数据库中读取WorkFlow对应的信息;然后使用FlowPreparer进行初始化,创建对应的数据目录等;最后创建FlowRunner来执行WorkFlow,并跟踪其执行状态。

参考内容

  • http://azkaban.github.io/azkaban/docs/latest/
  • https://github.com/azkaban/azkaban/releases/tag/3.40.0

✦ ✦ ✦ ✦ ✦ ✦ ✦ ✦

作者: Yanjun 原文:http://shiyanjun.cn/archives/1820.html


本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构师之旅 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
matlab常用目录操作
总结matlab下常用到的目录操作 添加当前文件夹及其子文件夹至搜索路径 % add path rootDir = fileparts(mfilename('fullpath')); addpath(genpath(rootDir)); cd(rootDir); 函数 参数 说明 mfilename mfilename('fullpath') 生成当前文件所在的完整目录,包括文件名 fileparts [pathstr,name,ext]= fileparts(filename) 文件完整目录分割成目录、文
李拜六不开鑫
2018/09/04
6070
基于深度学习的车辆检测系统(MATLAB代码,含GUI界面)
摘要:当前深度学习在目标检测领域的影响日益显著,本文主要基于深度学习的目标检测算法实现车辆检测,为大家介绍如何利用 M A T L A B \color{#4285f4}{M}\color{#ea4335}{A}\color{#fbbc05}{T}\color{#4285f4}{L}\color{#34a853}{A}\color{#ea4335}{B} MATLAB设计一个车辆检测系统的软件,通过自行搭建YOLO网络并利用自定义的数据集进行训练、验证模型,最终实现系统可选取图片或视频进行检测、标注,以及结果的实时显示和保存。其中,GUI界面利用最新的MATLAB APP设计工具开发设计完成,算法部分选择时下实用的YOLO v2/v3网络,通过BDD100K数据集进行训练、测试检测器效果。本文提供项目所有涉及到的程序代码、数据集等文件,完整资源文件请转至文末的下载链接,本博文目录如下:
全栈程序员站长
2022/11/03
1.1K0
基于深度学习的车辆检测系统(MATLAB代码,含GUI界面)
图像倾斜校正算法的MATLAB实现:图像倾斜角检测及校正
通过采用图像处理技术,可以将数码设备采 集到的文字、图片等信息转化成其他信息形势输出,例如转化成音频输出己解决视 障患者的视力需求。但是,由于输入设备或某些其他因素不可避免地使得采集到的 文本图像或多或少会出现某种程度的倾斜。因此,倾斜图像校正是当前文本图像研 宄领域中十分重要的课题,尤其在数字化、自动化领域。比如,提高OCR(Optical Character Recognition)识别率从而提高文档自动化处理效率,车牌号码自动 识别与交通监视,手写体自动识别,名片自动归类等。
拓端
2020/07/10
2.4K0
使用python读取matlab数据文件.mat
传送门 http://blog.stackoverflow.club/hdf5-usage/
羽翰尘
2019/11/19
2.6K0
MATLAB图像倾斜校正算法实现:图像倾斜角检测及校正|附代码数据
在本文中,随着多媒体技术的不断发展,数码相机,高清拍照手机等多媒体设备己经在人们的生活中占据了越来越重要的地位 ( 点击文末“阅读原文”获取完整代码数据******** ) 。
拓端
2022/11/14
1.3K0
两分钟搞定Python读取matlab的.mat数据
Matlab是学术界非常受欢迎的科学计算平台,matlab提供强大的数据计算以及仿真功能。在Matlab中数据集通常保存为.mat格式。那么如果我们想要在Python中加载.mat数据应该怎么办呢?所以今天就给大家分享一个使用python加载.mat数据的方法。我将使用Stanford Cars Dataset数据集作为例子为大家演示使用方法。
深度学习与Python
2019/06/18
15.3K2
Stata | 批量转换数据格式
如果有一批其他格式的数据,如 Excel,CSV 等,如何批量转为 Stata 的 .dta 格式数据?
PyStaData
2021/03/23
3.9K0
MATLAB实现HANTS时间序列滤波
  本文介绍在MATLAB中,实现基于HANTS算法(时间序列谐波分析法)的长时间序列数据去噪、重建、填补的详细方法。
疯狂学习GIS
2023/09/06
4670
MATLAB实现HANTS时间序列滤波
基于Matlab的三维人脸识别系统开发
在过去的十年中已经提出了几种用于图像处理和计算机视觉应用的机器学习算法。LBP,HAAR是一些流行的算法,广泛用于人脸识别并产生出色的结果。但是大多数这些算法不适合在无约束环境中进行实时识别。最近最先进的深度学习技术已经成为传统机器学习算法的新宠。人脸识别应用程序使用的图像只是范围(0-255)中像素值的组合。算法在那些灰度值中找到区别模式并将其视为被认为对于每个图像唯一的特征。然而在3D图像中,不存在像素信息,而是仅每个点的位置(x,y,z)可用。
代码医生工作室
2019/07/05
1.9K0
基于Matlab的三维人脸识别系统开发
Matlab数据导入--importdata和load函数
在使用matlab将数据导入到工作空间的时候,经常会使用到两个函数,一个是importdata函数,另一个是load函数,它们的使用方法和使用场景是太相同的,如果不太注意就可能会犯错误,在这里做简要的说明和记录。
用户9925864
2022/07/27
1.2K0
Matlab数据导入--importdata和load函数
Matlab系列之通用命令
对于以下列出的MATLAB的通用命令操作都比较简单,也是在操作过程中经常会需要用到的命令,在命令行窗口的“>>”之后输入命令以及命令所需的变量即可使用,对于不知道使用的命令,可以直接使用help以获取详细的用法。
狂人V
2020/06/28
6740
MATLAB使用教程(1)从零开始,MATLAB 2023a中文版下载安装
Matlab 是一种强大的数学软件,广泛应用于工程、科学和金融等领域。它提供了各种工具箱和函数,可以用于数据分析、图像处理、机器学习等方面。在本文中,我将介绍 Matlab 软件的一些常用功能使用技巧,并结合实际案例进行举例讲解,帮助读者更好地掌握 Matlab 的使用技巧。
用户10565111
2023/05/13
8570
Python - 读写 Matlab Mat 格式数据
如果 matlab 保存 data 时,采用的是 ‘-v7.3’,scipy.io.loadmat函数加载数据会出现错误:
AIHGF
2019/02/18
4.9K0
MATLAB画图语句_excel绘图技巧
Matlab作出的图普遍没有Origin作出的美观好看,而且导出为eps或emf格式后会有各种奇怪的Bug。目前普遍采用的一种方法是,将Matlab数据导出为mat文件后再导入Origin中手工作图,这种方式需要不少重复性劳动,并不是一种很完美的解决方案。 前几天偶然看到Origin提供了COM接口可供Matlab调用,于是就研究了下可否用Matlab调用Origin来生成所需的emf格式图片,最终经过一番折腾,基本完成了这个目标。
全栈程序员站长
2022/09/29
9300
MATLAB画图语句_excel绘图技巧
Matlab图像处理常用基本函数
之前用Matlab做图像处理工作时,用到什么函数就查什么函数,从没做过系统的总结,再做的时候又要去查,所以总结还是有必要的~
李拜六不开鑫
2018/10/11
1.4K0
Stata | 自动生成中南财大2019拟录取硕士研究生分析报告
又是一年考研季,今天有朋友问我去年的学校录取情况。作为一只“菜狗”,这那是我该操心的事,但我还是去官网找了下去年的结果。然而官网上拟录取名单是 PDF 格式,不方便拿来做统计分析,所以就用 Stata 顺手做了一些简单的分析工作,觉得这也为需要提供重复性报告的工作提供一种思路。具体来说,主要包括如下技术要点:
PyStaData
2020/07/21
1K0
Stata | 自动生成中南财大2019拟录取硕士研究生分析报告
【Matlab】加载路径下所有指定文件
想用matlab加载路径下所有指定文件,比如加载一个路径下的所有png图像、txt文件等,网上查了一圈也不是很好用,解决了问题就分享一下。
AomanHao
2024/08/07
3240
matlab批量处理excel(CSV)文件数据
今天是2019-1-29,参加完2019年美国大学生数学建模竞赛,小伙伴都回家了,就我一个人在寝室,太无聊了,就把在比赛中遇到的excel批处理,写一下思路(ps:其实我在比赛中 利用的是SQLServer数据库和matlab相结合的数据处理方法,但是一般情况下遇到的都是matlab对excel数据批处理,所以降低要求写了matlab对excel数据批处理,此思路都是小编凭感觉自己摸索出来的,如有错误欢迎指出)。
全栈程序员站长
2022/09/14
1.2K0
matlab批量处理excel(CSV)文件数据
Matlab实现数据导出
MATLAB 允许你使用数据在另一个应用程序读取 ASCII 文件,MATLAB提供了多种数据输出选项。
用户9925864
2022/07/27
1.1K0
Matlab实现数据导出
stata 导出 相关系数表_STATA数据处理技巧与计量分析二|基本语句介绍
上期小统和大家一起了解了STATA数据处理技巧与计量分析的背景介绍,这期小统和大家一起学习一下基本语句介绍。
全栈程序员站长
2022/07/04
1.6K0
stata 导出 相关系数表_STATA数据处理技巧与计量分析二|基本语句介绍
推荐阅读
相关推荐
matlab常用目录操作
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档