一、历史背景
应用在海量数据的存储、计算、挖掘、分析、查询以及机器学习等。
二、讨论点
本文从Hadoop(1.0)系统中调度策略的角度展开讨论。这本质还是对Hadoop的集群资源进行管理,主要有四个方面:
其中前三个方面本质是Hadoop的资源分配模型;最后一个方面是Hadoop的资源表示模型。
当没有被指定特定调度器时,Hadoop系统在启动时会加载一个默认的缺省调度器,即先进先出调度器(FIFO Scheduler)。FIFO Scheduler是按照客户作业(Job)的提交顺序获得调度机会以便执行。优点是算法思想简明且系统资源开销小,但FIFO Scheduler对用户提交作业不作区别,且FIFO策略有利于长作业,而不利于短作业,对于一些实时性要求较高的交互型作业往往效果很差。在出现长作业时,系统的平均响应时间过长,整体吞吐率下降。
在Apache Hadoop官方版本中还提供了Yahoo公司的计算能力调度器(Capacity Scheduler)和FaceBook公司的公平调度器(Fair Scheduler)。它们均是为多用户共享Hadoop集群而设计的调度框架,并以提高集群资源利用率和降低成本为目标。
Capacity Scheduler的设计思想主要是以队列为单位对系统资源进行划分和管理,再将资源按比例分配给各个队列,同时设置队列占有资源的上下限防止个别队列独占资源。缺点是队列的设置和选组无法动态进行。
Fair Scheduler以资源池对系统资源进行划分,但更注重系统资源对用户的公平共享。除此之外,它还支持资源抢占,降低小作业的调度延迟。缺点是没有考虑当前系统各节点的负载水平和实际的负载状态,导致节点实际负载不均衡。
Hadoop系统对Map任务的调度主要以数据本地性(data-locality)为考虑因素。就是将Map任务分配给某些节点,在这些节点上有Map任务即将要处理的输入数据分片(InputSplit),所以不需要通过远程拷贝的方式将存储在其他节点上的输入数据拉取到该Map任务节点上。
提高任务数据本地性对加快MapReduce任务执行速度,作业周转时间,避免冗余网络I/O,节省集群带宽资源等诸多方面有直接影响。
在异构环境中MapReduce表现不足。在分布式集群环境中,个别任务可能因为负载不均衡,资源分配不合理或者节点性能差异过大等原因,使得它的运行速度比其他任务落后,那么这些任务会拖慢整个作业的执行进度。为避免这种情况,Apache在Hadoop系统中内置了备份任务推测执行机制(Speculative Execution),采用慢任务推测机制识别出“拖后腿任务”,并为这些“慢任务”启动“备份任务”,将它们分配给空闲或者计算能力强的节点,最终选择执行快的备份任务作为最终结果。
三、Hadoop核心技术
HDFS为存取大文件设计,拥有以下几点基本特性:
MapReduce是源自Google的一个编程模型,它以高并行和可扩展的方式处理大数据集。MapReduce的灵感来源于函数式编程,用户可将他们的计算表达为map和reduce函数,将数据作为键值对来处理,核心理念是移动计算,而不是移动数据。
Hadoop还提供了软件基础架构,以一系列map和reduce任务的形式运行MapReduce 作业。Map任务在输入数据的子集上调用map函数。在完成这些调用后,reduce 任务开始在map函数所生成的中间数据上调用reduce任务,生成最终的输出。
最重要的是,Hadoop基础架构负责处理分布式处理的所有复杂方面:并行化计算、任务调度、资源管理、机器间通信、软硬件故障处理等等。得益于这种干净的抽象,实现分布式应用程序从未像现在这么容易过。
MapReduce核心组件JobClient、JobTracker和TaskTracker,主要功能作用如下:
作业输出结果存放到HDFS上。
JobTracker对作业的管理被抽象为三个层次:作业监控层、任务监控层和任务执行层,并以“三层多叉树”的方式建立各层之间的关系。
Hadoop以队列为单位管理作业和资源,Hadoop调度器本质上均采用“三级调度模型”。首先选择一个作业队列(JobQueue),再从作业队列中选择某项作业(Job);接着从该项作业划分后的若干个子任务(Task)中进行选择;最后分配给某个有能力的TaskTracker去执行。其中前两个级别的选择策略是由不同调度器根据实际应用需求而设计实现的,在最后一个级别的任务选择策略中,Hadoop考虑的关键因素均为数据本地性。因此Hadoop将之封装成一个通用的模块供给调度器使用,具体存放在JobInProgress类中。
在MapReduce中,一次作业的执行过程分为三个重要阶段:map、shuffle和reduce阶段,每个阶段都以键值对<key, value>作为输入/输出的数据形式。map和reduce两个阶段分别执行若干Map任务(MapTask)和Reduce任务(ReduceTask)。每个Map任务处理一个输入数据分片(InputSplit),并将产生的中间结果数据写到本地磁盘;而Reduce任务则从每个Map任务的本地磁盘上远程拷贝中间结果数据中相应的数据片段,再经过合并(merge)、排序(sort)、分组(group)和归约(reduce)后,将结果写到HDFS作为作业运行的最终结果。
下面分别就MapReduce的map、shuffle、reduce这三个重要阶段做简要介绍。
在map阶段,每个Map任务所要处理的数据称作一个输入数据分片(InputSplit),一个MapTask只处理一个InputSplit,它是由数据块(block)切分后的分片,但一般InputSplit与block的关系是1比1,也就是一个Map任务处理一个block的数据。
首先,每个Map任务从它的输入分片(InputSplit)中解析出一系列的键值对<key,value>,并交给用户作业程序中的map()函数处理,并得出一系列的结果键值对。
然后,由分区程序partition()对键值对进行分区以确定每个结果键值对应该交给哪个Reduce任务处理。默认的分区程序是按照map处理的结果键值对<key, value>中的key进行哈希,再以Reduce任务的总量取模,得出该键值对的分区号。
接着,以上产生的键值对和对应的分区号都会被写入一个环形逻辑结构的内存缓冲区(MapOutputBuffer)。当环形内存缓冲区(默认为100MB)达到溢写(spill)阀值时(默认80MB),便会启动溢写(spill)线程。溢写线程首先锁定这80MB数据并将其写到本地磁盘上,以一个临时文件(output/spillN.out, N表示溢写次数)的形式存放。在溢写过程中会进行排序(sort)、分组(group)、预规约(combine)的操作。其中本地排序是MapReduce默认的行为。溢写有一个重要细节是,如果map()函数处理的结果键值对中有很多个键值对的分区号一样,即需要交给同一个Reduce任务处理,那么需要将这些键值对进行拼接起来,这样只需要一个分区号就可以标记出这些键值对的分区。同时要进行分组合并,即value相同的键值对合并成<key, {value1, value2, value3, …}>,进一步减少了记录数据。
最后,如果用户定义了预规约(combine),那么就进行本地规约。最终,如果有很多次的溢写,就会产生多个临时文件,所以需要再对这些临时文件进行一次总的拼接、排序、分组、和预规约,生成Map任务的最终结果文件(output/file.out)。
Reduce任务以HTTP方式从Map端将对应分区号的中间结果数据拷贝到本地磁盘或内存中,而Map端启动HTTP Server来满足这种并发请求。当一项作业(Job)中,成功完成的Map任务数达到总Map任务数的5%后,才开始shuffle。在shuffle阶段,Reduce任务端的TaskTracker会启动MapEventsFetcherThread线程从JobTracker上获取已经运行完成的Map任务列表,并保存到allMapEvents中。而ReduceTask会启动一个后台线程(GetMapEventsThread),周期性从所在的TaskTracker中获取该项作业中已完成的Map任务列表,并保存到映射表mapLocations中,该映射表保存了TaskTracker与已完成Map任务列表的映射关系。
Reduce任务在执行计算之前就是不断的远程拷贝当前Job里的每个Map任务的特定分区号的输出数据,同时对从不同Map任务端拷贝过来的数据做合并(merge),最后形成一个文件作为Reduce任务的输入数据。所以,对于一个Job而言,当所有的Map任务执行完毕,并且所有Map任务端的中间结果数据被拉取到各个相应的Reduce任务端,此时这些Reduce任务得到了它们各自全部的输入数据后才能真正开始进一步的计算操作。接着,对merge过后的键值对<key, value>按照key进行本地归并排序,通过排序,key相同的键值对聚集到一起形成若干形如<key, {value1, value2, value3, …}>的分组,然后每个键值对交给用户程序中编写的reduce()函数处理。最终将数据结果直接写到HDFS上作为该作业输出的一部分。
在Hadoop系统中调度方面主要涉及两个关键调度模块:Hadoop作业调度和MapReduce任务调度。作业调度是由不同调度器负责,它根据用户实际应用需求而设计实现。而在任务级别的选择策略中,Hadoop考虑的关键因素主要为数据本地性(data-locality),Hadoop将其封装成一个通用的模块供给调度器使用,具体存放在JobInProgress类中。
在Map任务调度中,现有的Hadoop系统对Map任务调度主要以数据本地性为考虑因素。就是将Map任务分配给一些节点,在这些节点上面存储着这个Map任务即将要处理的输入数据分片,所以就不需要通过远程拷贝的方式将存储在其他节点上的该Map任务的输入数据拉取到该任务节点上面再进行计算处理。由于Hadoop是针对大数据的处理平台,每个Map任务的输入数据称作一个输入数据分片,它与一个数据块的大小相同。所以提高任务数据本地性对加快Map任务的任务执行速度,Hadoop作业周转时间,避免冗余网络I/O,节省集群带宽资源等诸多方面有直接而且明显的影响,国内外众多学者在该方面的研究最为广泛和深入。通过对Hadoop源码研究,调度器直接调用JobInProgress中的obtainNewMapTask()函数来从某项作业中选择一个Map任务,此方法封装了所有调度器公用的Map任务选择策略模块。主要选择策略思想如下:
经过以上调度过程,最终为节点分配一个合适的MapTask。而在Reduce任务的调度中,Hadoop采取了非常简单的静态策略,Hadoop认为Reduce任务没有数据本地性。作业调度器同样通过调用JobInProgress实例封装的obtainNewReduceTask()函数,具体步骤如下,
主要参考: