前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark内核分析之Shuffle操作流程(非常重要)

Spark内核分析之Shuffle操作流程(非常重要)

作者头像
z小赵
发布2018-09-05 15:48:17
9520
发布2018-09-05 15:48:17
举报
文章被收录于专栏:简单聊聊Spark

如题,我们来分析一下spark的shuffle操作原理;为什么说其非常重要,是因为shuffle操作是我们在Spark调优中非常重要的一环,对shuffle进行了优化,往往可以使得我们的spark程序运行效率有极大的提升。依照惯例,我们先来看一张图;

普通shuffle流程图

上图是一个普通的Shuffle操作流程原理图,一个shuffle操作由三个RDD算子构成,分别是mapPartitionsRDD,ShuffleRDD,mapPartitionsRDD;如上图所示,

1.每个ShuffleMapTask都会为每个ResultTask创建一个Bucket缓存和一个对应的ShuffleBlockFile磁盘文件;2.每个ShuffleMapTask的输出相关信息封装成一个MapStatus发送到DAGScheduler的MapOutputTracker中去; 3.ResultTask开始拉取该任务所需要的数据,ResultTask通过向DAGScheduler的MapOutputTracker获取MapStatus的信息,从而知道自己需要的数据所在的位置,然后去相应的位置拉去数据到该任务所在节点的内存中,如果内存不够,会将部分数据写入磁盘,完成这系列的操作是由ShufflerRDD算子完成的; 4.然后ResultTask对拉取到的数据进行聚合操作,最后生成mapPartitionsRDD算子;

想想上面的这个Shuffler流程会有什么问题?

我们来做一个假设,如果有100个ShuffleMapTask,2个CPU Core,100个ResultTask,那么这个shuffler操作将产生10000个文件,如此多的文件对于Spark作业的性能就是一个灾难;针对这个问题当然有对应的优化策略,接着我们来看另外一张图;

优化的Shuffler流程图

        通过优化的Shuffler操作如上图所示,假设有100个ShufflerMapTask,2CPU core,100个Resulttask,优化后产生的中间文件是200个,是优化之前的1/50;那么这是如何做到的,通过阅读源码可以知道,只要引入consolidation机制就可以实现了,其配置是通过在SparkConf中配置对应的参数即可实现;

        来简单分析一下:一个ShuffleMapTask将数据写入本地不变,但是当这一批ShuffleMapTask运行完成以后,下一批ShuffleMapTask开始运行(一批ShuffleMapTask是指,同一时间有两个Task并行执行,因为有两个CPU Core),它们产生的数据会直接写入上一批ShuffleMapTask产生的本地文件中;上图中左边的一组可以称为一组ShuffleGroup,每个文件中都存储了多个ShuffleMapTask的数据,每个ShuffleMapTask所产生的数据是一个segment,每个File中通过索引,偏移量来标记每部分数据来自不同的ShuffleMapTask。

下面我们来看看源码是如何实现的;

ShuffleMapTask的runTask

1.首先通过Spark全局变量得到shuffleManager对象,并通过shuffleManager对象获得Write对象;

2.接着,通过rdd.iterator方法对属于自己的partition进行计算,最后会调用我们自己编写的RDD算子来计算partition;

3.接着Writer调用自己的write方法将RDD算子计算的结果写入缓存;

HashShuffleWriter的write

1.判断aggregator为true,并且是否设置了map端的combine操作;若成立,则进行map端的数据合并(这里是一个spark优化点,在我之前关于spark优化系列文章中有写过);

2.对所有经过合并操作之后的数据遍历,根据每个元素获得对应的bucketId,然后将改元素写入对应的bucket缓存中;

这里我们来看看这个shuffle对象做了什么?

FileShuffleBlockManager的forMapTask

1.首先创建出一个ShuffleWriterGroup对象;

2.接着判断Spark作业是否设置了consolidateShuffleFiles;如果设置其为true,首先得到一个fileGroup对象,然后使用shuffleId,mapId,BucketId来得到一个blockId,接着根据这个blockId写数据到磁盘的对象;相反,如果没有设置consolidateShuffleFiles为true,则直接为每个shuffleMapTask创建一个blockFile,然后得到一个写数据到磁盘的对象;

3.执行完这里后,接着调用write方法将数据写入内存缓冲bucket,然后再将数据写入磁盘;

写数据到这里就完成了,然后会将产生的数据位置等信息封装成一个MapStatus对象发送给DAGSchedule的MapOutputTracker中;接下来ResultTask开始读取数据;

ShuffleRDD的compute

HashShuffleReader的read

BlockStoreShuffleFetcher的fetch

BlockStoreShuffleFetcher的fetch

总结:到此shuffle的整个操作流程就分析完了,接下来会分析底层数据存储的核心组件BlockManager的工作原理,,欢迎关注。

如需转载,请注明:

本篇:Spark内核分析之Shuffle操作流程(非常重要)

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.12.27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档