Shuffle就是将不同节点上相同的Key拉取到一个节点的过程。这之中涉及到各种IO,所以执行时间势必会较长。对shuffle的优化也是spark job优化的重点。
Spark的Shuffle在1.2之前默认的计算引擎是HashShuffleManager
假设每个executor只有一个core,意味着一个executor只能同时运行一个task。有三个Reducer,每个reducer都会从上游拉取对应block file,每个task会为下游的每一个reducer生成一个block文件,这样算,总的文件个数就是上一个stage的分区数 × 下游的分区数 (如图是12个) 如果分区数比较多,map task就比较多,下个stage的reduce task也比较多,那就会很多小文件产生,IO消耗很大,显然不好。
优化就是复用buffer,也就使输出的block文件合并了。开启合并机制spark.shuffle.consolidateFiles=true
。
如图,一个core不管有几个task都会复用同一个buffer,这样生成的文件个数即为core × reducer。很明显比优化前少了很多。但如果下游stage的分区很多的话,文件仍然多。
在Spark1.2版本之后,出现了SortShuffle,这种方式以更少的中间磁盘文件产生而远远优于HashShuffle。
在该模式下,数据会先写入一个内存数据结构中(默认5M),Map或者Array。如果使reduceByKey类算子,就用Map,join类算子就用Array。每条数据写入内存后就会判断是否达到阈值,如果达到了就溢写磁盘,最后清空内存。shuffle中的定时器会定时会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,再以默认每批1w条数据通过BufferedOutputStream
写入磁盘。
最后,再把这些文件合并成一个文件,并多出一个索引文件来告诉下游task从哪个offset开始读取。
结果生成的文件个数为 map task × 2,已经大大减少了。
满足以下两个条件:
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold
参数的值(默认200)。
不是聚合类的shuffle算子(比如reduceByKey
)。
在这种机制下,当前stage的task会为每个下游的task都创建临时磁盘文件。将数据按照key值进行hash,然后根据hash值,将key写入对应的磁盘文件中。最终,同样会将所有临时文件依次合并成一个磁盘文件,建立索引。 本质上就是在Hash Shuffle后进行了小文件的合并。相比普通机制的Sort Shuffle,文件个数也是map task × 2,但省去了排序的过程消耗。
— THE END —