Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Hadoop旧mapreduce的map任务切分原理

Hadoop旧mapreduce的map任务切分原理

作者头像
CSDN技术头条
发布于 2018-02-12 03:11:03
发布于 2018-02-12 03:11:03
9460
举报
文章被收录于专栏:CSDN技术头条CSDN技术头条

前言

最近在工作过程中接触一些Hive数据仓库中的表,这些表实际是从关系型数据库通过Sqoop抽到Hive的。在开发过程中对map任务的划分进行性能调优,发现mapreduce中关于FileInputFormat的参数调整都不起作用,最后发现这些老任务都是用旧版的mapreduce开发的,于是顺便研究下旧版mapreduce的任务划分策略。有关新版mapreduce的任务划分策略,大家可以参考我之前的博文《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》。

源码分析

根据《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的内容,我们知道map任务的划分关键在于FileInputFormat的getSplits方法的实现策略,现在我们来看看其源码:

代码语言:js
AI代码解释
复制
    public InputSplit[] getSplits(JobConf job, int numSplits)  

      throws IOException {  
      Stopwatch sw = new Stopwatch().start();  
      FileStatus[] files = listStatus(job);  
 
      // Save the number of input files for metrics/loadgen  
      job.setLong(NUM_INPUT_FILES, files.length);  
      long totalSize = 0;                           // compute total size  
      for (FileStatus file: files) {                // check we have valid files  
        if (file.isDirectory()) {  
          throw new IOException("Not a file: "+ file.getPath());  
        }  
        totalSize += file.getLen();  
      }  
 
      long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);  
      long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.  
        FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);  
 
      // generate splits  
      ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);  
      NetworkTopology clusterMap = new NetworkTopology();  
      for (FileStatus file: files) {  
        Path path = file.getPath();  
        long length = file.getLen();  
        if (length != 0) {  
          FileSystem fs = path.getFileSystem(job);  
          BlockLocation[] blkLocations;  
          if (file instanceof LocatedFileStatus) {  
            blkLocations = ((LocatedFileStatus) file).getBlockLocations();  
          } else {  
            blkLocations = fs.getFileBlockLocations(file, 0, length);  
          }  
          if (isSplitable(fs, path)) {  
            long blockSize = file.getBlockSize();  
            long splitSize = computeSplitSize(goalSize, minSize, blockSize);  
 
            long bytesRemaining = length;  
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {  
              String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,  
                  length-bytesRemaining, splitSize, clusterMap);  
              splits.add(makeSplit(path, length-bytesRemaining, splitSize,  
                  splitHosts[0], splitHosts[1]));  
              bytesRemaining -= splitSize;  
            }  
 
            if (bytesRemaining != 0) {  
              String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length  
                  - bytesRemaining, bytesRemaining, clusterMap);  
              splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,  
                  splitHosts[0], splitHosts[1]));  
            }  
          } else {  
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);  
            splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));  
          }  
        } else {   
          //Create empty hosts array for zero length files  
          splits.add(makeSplit(path, 0, length, new String[0]));  
        }  
      }  
      sw.stop();  
      if (LOG.isDebugEnabled()) {  
        LOG.debug("Total # of splits generated by getSplits: " + splits.size()  
            + ", TimeTaken: " + sw.elapsedMillis());  
      }  
      return splits.toArray(new FileSplit[splits.size()]);  
    }  
 
    protected long computeSplitSize(long goalSize, long minSize,  
                                         long blockSize) {  
      return Math.max(minSize, Math.min(goalSize, blockSize));  
    }  

这里对以上代码的划分策略进行整理:

  1. 遍历当前作业的所有输入文件,然后将累积这些文件的字节数并保存到变量totalSize中;
  2. 如果用户指定了mapreduce.job.maps参数,那么这个参数会被保存在入参numSplits中;
  3. 用户想要通过numSplits控制map任务的数量,那么需求对totalSize进行平分,以便确定每个map任务划分的输入大小。这个计算很简单,即使用totalSize除以numSplits,最后得到的目标划分大小存储在变量goalSize中;
  4. 常量SPLIT_MINSIZE实际是由参数mapreduce.input.fileinputformat.split.minsize来控制的,如果没有配置则默认是1。minSplitSize默认是1,切旧版FileIntputFormat没有设置此变量的地方。最后取SPLIT_MINSIZE和minSplitSize的最大值,并保存在变量minSize中;
  5. 遍历当前作业的每个输入文件,计算每个输入文件,将被划分的任务数量,最后将每个文件划分的任务数量合并起来就是整个作业划分的任务数量。

以上只是总体分析了作业的任务划分,有关每个输入文件的任务数量划分步骤如下:

  1. 判断文件的大小,只有文件字节数大于0才是有意义的;
  2. 判断文件是否是可以切分的,只有能够切分的文件才会继续进行任务数量划分;
  3. 调用文件的getBlockSize方法,获取文件的块大小并存储在变量blockSize中;
  4. 调用computeSplitSize方法计算最后划分给每个任务的输入大小,并保存在splitSize中。计算公式为:splitSize = max(minSize, min(goalSize, blockSize));
  5. 将文件按照splitSize的大小进行划分,不足splitSize大小的也算作一个任务划分数。

总结

根据以上分析发现旧版mapreduce和新版mapreduce的FileIntputFormat关于map任务数量划分的实现逻辑不同,在对它们进行开发和性能优化时要特别注意。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2016-06-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CSDN技术头条 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系
1,在介绍hadoop写文件的时候我们经常会说首先分割文件为多个块;那么是怎么分割的呢?
全栈程序员站长
2022/08/05
9770
MapReduce InputFormat之FileInputFormat
1)、数据切分,按照某个策略将输入数据且分成若干个split,以便确定Map Task的个数即Mapper的个数,在MapReduce框架中,一个split就意味着需要一个Map Task;
全栈程序员站长
2022/07/23
3910
MapReduce InputFormat之FileInputFormat
MapReduce获取分片数目
MapReduce Application中mapper的数目和分片的数目是一样的,但是分片数目和什么有关呢?
jiewuyou
2022/09/29
2370
MapReduce获取分片数目
SparkCore之RDD
https://blog.csdn.net/zym1117/article/details/79532458
用户1483438
2022/04/26
6840
SparkCore源码分析之RDD默认分区规则
仔细分析以上方法,发现我们的List在模式匹配中匹配到了_情况,所以核心逻辑在内层函数positions处封装:
tyrantlucifer
2022/03/23
5980
SparkCore源码分析之RDD默认分区规则
MapReduce中map并行度优化及源码分析
  一个job的map阶段并行度由客户端在提交job时决定,而客户端对map阶段并行度的规划的基本逻辑为:将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理。
intsmaze-刘洋
2018/08/29
9350
MapReduce中map并行度优化及源码分析
昨天的一个问题及答案(关键字Gzip、MapReduce、Spark)
OK,我们知道gzip不可分割了。那么一个10G的gzip文件在HDFS是怎么存储的呢?
大数据真好玩
2021/10/12
5310
MapReduce之 FileInputFormat的切片策略(默认)
①获取当前输入目录中所有的文件 ②以文件为单位切片,如果文件为空文件,默认创建一个空的切片 ③如果文件不为空,尝试判断文件是否可切(不是压缩文件,都可切) ④如果文件不可切,整个文件作为1片 ⑤如果文件可切,先获取片大小(默认等于块大小),循环判断 待切部分/ 片大小 > 1.1,如果大于先切去一片,再判断… ⑥剩余部分整个作为1片 以下为源码部分 public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatc
孙晨c
2020/07/21
5920
Hadoop之MapReduce04【客户端源码分析】
  Configuration 用来存储相关的配置文件。在该类中有一段static代码块
用户4919348
2019/04/17
4430
Hadoop之MapReduce04【客户端源码分析】
MapReduce切片机制
  MapReduce是一个分布式计算框架,处理的是海量数据的计算。那么并行运算必不可免,但是到底并行多少个Map任务来计算呢?每个Map任务计算哪些数据呢?这些我们数据我们不能够凭空估计,只能根据实际数据的存储情况来动态分配,而我们要介绍的切片就是要解决这个问题,
用户4919348
2019/04/17
1.2K0
MapReduce切片机制
hadoop2-MapReduce详解
本文是对Hadoop2.2.0版本的MapReduce进行详细讲解。请大家要注意版本,因为Hadoop的不同版本,源码可能是不同的。
Hongten
2018/12/14
1.2K0
hadoop2-MapReduce详解
他来了他来了,Hadoop序列化和切片机制了解一下?
一个超大文件在HDFS上存储时,是以多个Block存储在不同的节点上,比如一个512M的文件,HDFS默认一个Block为128M,那么1G的文件分成4个Block存储在集群中4个节点上。
王知无-import_bigdata
2021/09/22
6850
Hadoop-2.4.1学习之如何确定Mapper数量
MapReduce框架的优势是可以在集群中并行运行mapper和reducer任务,那如何确定mapper和reducer的数量呢,或者说Hadoop如何以编程的方式控制作业启动的mapper和reducer数量呢?在《Hadoop-2.4.1学习之Mapper和Reducer》中曾经提及建议reducer的数量为(0.95~1.75 ) * 节点数量 * 每个节点上最大的容器数,并可使用方法Job.setNumReduceTasks(int),mapper的数量由输入文件的大小确定,且没有相应的setNumMapTasks方法,但可以通过Configuration.set(JobContext.NUM_MAPS, int)设置,其中JobContext.NUM_MAPS的值为mapreduce.job.maps,而在Hadoop的官方网站上对该参数的描述为与MapReduce框架和作业配置巧妙地交互,并且设置起来更加复杂。从这样一句含糊不清的话无法得知究竟如何确定mapper的数量,显然只能求助于源代码了。
星哥玩云
2022/07/03
5180
textFile构建RDD的分区及compute计算策略
1,textFile A),第一点,就是输入格式,key,value类型及并行度的意义。 def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() //输入文件的格式TextInputFormat,key的类型LongWritable ,value的类型Text //最小分区数defaultMinPartition
Spark学习技巧
2018/01/31
1.1K0
Hadoop源码篇--Client源码
今天起剖析源码,先从Client看起,因为Client在MapReduce的过程中承担了很多重要的角色。
LhWorld哥陪你聊算法
2018/09/13
1.5K0
Hadoop源码篇--Client源码
Spark RDD 分布式弹性数据集
rdd是一个粗粒度的数据生成方式和流转迭代计算方式的描述。它可以通过稳定的存储器或者从其他RDD生成,它并不需要急着进行转换,只需要在特定的rdd进行一次性的数据的迭代流转。rdd记录着自己的依赖关系,以防在数据丢失时可以通过“血缘”关系再次生成数据。用户也可以自己选择在经常重用的rdd进行数据落地,放置丢失后重做。
Tim在路上
2022/03/23
4050
Spark RDD 分布式弹性数据集
自定义 hadoop MapReduce InputFormat 切分输入文件
在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduce 一次读取一个 cookieId 下的所有记录,然后再按 time 进行切分 session,逻辑伪码如下: for OneSplit in MyInputFormat.getSplit() // OneSplit 是某个 cookieId 下的所有记录    
用户1177713
2018/02/24
1.9K0
自定义 hadoop MapReduce InputFormat 切分输入文件
Hadoop MapReduce 工作过程
一个MapReducer作业经过了input,map,combine,reduce,output五个阶段,其中combine阶段并不一定发生,map输出的中间结果被分到reduce的过程成为shuffle(数据清洗)。
smartsi
2019/08/07
1.3K0
【单点】每日突破,MapReduce Split
问:在MapReduce进行数据处理时,会进行split数据切片,它的默认拆分规则是?如果不按照默认规则进行拆分,会发生什么现象?
十里桃花舞丶
2021/09/10
4820
MapReduce之片和块的关系
blockSize: 块大小 minSize: minSize从mapreduce.input.fileinputformat.split.minsize和1之间对比,取最大值 maxSize: 读取mapreduce.input.fileinputformat.split.maxsize,如果没有设置,则使用Long.MaxValue作为默认值
孙晨c
2020/07/21
4940
相关推荐
hadoop怎么分割写入的文件为多个块的,一个map对应一个split分片吗?split与block的关系
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档