spark基础之shuffle机制和原理分析

一 概述缓存

Shuffle就是对数据进行重组,因为分布式计算的特性和要求,在实现细节上更加繁琐和复杂网络

在MapReduce框架,Shuffle是链接Map和Reduce之间的桥梁,Map阶段经过shuffle读取数据并输出到对应的Reduce;而Reduce阶段负责从Map端拉取数据并进行计算。在整个shuffle过程当中,每每伴随着大量的磁盘和网络I/O。因此shuffle性能的高低也直接决定了整个程序的性能高低。Spark也会有本身的shuffle实现过程并发



在DAG调度的过程当中,Stage阶段的划分是根据是否有shuffle过程,也就是存在ShuffleDependency宽依赖的时候,须要进行shuffle,这时候会将做业job划分红多个Stage;而且在划分Stage的时候,构建ShuffleDependency的时候进行shuffle注册,获取后续数据读取所须要的ShuffleHandle,最终每个job提交后都会生成一个ResultStage和若干个ShuffleMapStage,其中ResultStage表示生成做业的最终结果所在的Stage. ResultStage与ShuffleMapStage中的task分别对应着ResultTask与ShuffleMapTask。一个做业,除了最终的ResultStage外,其余若干ShuffleMapStage中各个ShuffleMapTask都须要将最终的数据根据相应的Partitioner对数据进行分组,而后持久化分区的数据。app

 

一 HashShuffle机制框架

1.1 HashShuffle概述分布式

在spark-1.6版本以前,一直使用HashShuffle,在spark-1.6版本以后使用Sort-Base Shuffle,由于HashShuffle存在的不足因此就替换了HashShuffle. oop

 

咱们知道,Spark的运行主要分为2部分:一部分是驱动程序,其核心是SparkContext;另外一部分是Worker节点上Task,它是运行实际任务的。程序运行的时候,Driver和Executor进程相互交互:运行什么任务,即Driver会分配Task到Executor,Driver 跟 Executor 进行网络传输; 任务数据从哪儿获取,即Task要从 Driver 抓取其余上游的 Task 的数据结果,因此有这个过程当中就不断的产生网络结果。其中,下一个 Stage 向上一个 Stage 要数据这个过程,咱们就称之为 Shuffle。性能

 

1.2 没有优化以前的HashShuffle机制优化


在HashShuffle没有优化以前,每个ShufflleMapTask会为每个ReduceTask建立一个bucket缓存,而且会为每个bucket建立一个文件。这个bucket存放的数据就是通过Partitioner操做(默认是HashPartitioner)以后找到对应的bucket而后放进去,最后将数据spa

刷新bucket缓存的数据到磁盘上,即对应的block file.

 

而后ShuffleMapTask将输出做为MapStatus发送到DAGScheduler的MapOutputTrackerMaster,每个MapStatus包含了每个ResultTask要拉取的数据的位置和大小

ResultTask而后去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster获取MapStatus,看哪一份数据是属于本身的,而后底层经过BlockManager将数据拉取过来

 

拉取过来的数据会组成一个内部的ShuffleRDD,优先放入内存,内存不够用则放入磁盘,而后ResulTask开始进行聚合,最后生成咱们但愿获取的那个MapPartitionRDD

 

缺点:

如上图所示:在这里有1个worker,2个executor,每个executor运行2个ShuffleMapTask,有三个ReduceTask,因此总共就有4 * 3=12个bucket和12个block file。

# 若是数据量较大,将会生成M*R个小文件,好比ShuffleMapTask有100个,ResultTask有100个,这就会产生100*100=10000个小文件

# bucket缓存很重要,须要将ShuffleMapTask全部数据都写入bucket,才会刷到磁盘,那么若是Map端数据过多,这就很容易形成内存溢出,尽管后面有优化,bucket写入的数据达到刷新到磁盘的阀值以后,就会将数据一点一点的刷新到磁盘,可是这样磁盘I/O就多了

 

1.3 优化后的HashShuffle


每个Executor进程根据核数,决定Task的并发数量,好比executor核数是2,就是能够并发运行两个task,若是是一个则只能运行一个task

假设executor核数是1,ShuffleMapTask数量是M,那么它依然会根据ResultTask的数量R,建立R个bucket缓存,而后对key进行hash,数据进入不一样的bucket中,每个bucket对应着一个block file,用于刷新bucket缓存里的数据

 

而后下一个task运行的时候,那么不会再建立新的bucket和block file,而是复用以前的task已经建立好的bucket和block file。即所谓同一个Executor进程里全部Task都会把相同的key放入相同的bucket缓冲区中

 

这样的话,生成文件的数量就是(本地worker的executor数量*executor的cores*ResultTask数量)如上图所示,即2 * 1* 3 = 6个文件,每个Executor的shuffleMapTask数量100,ReduceTask数量为100,那么

未优化的HashShuffle的文件数是2 *1* 100*100 =20000,优化以后的数量是2*1*100 = 200文件,至关于少了100倍

 

缺点:若是 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生不少小文件。

 

 

二 Sort-Based Shuffle

2.1 Sort-Based Shuffle概述

HashShuffle回顾

HashShuffle写数据的时候,内存有一个bucket缓冲区,同时在本地磁盘有对应的本地文件,若是本地有文件,那么在内存应该也有文件句柄也是须要耗费内存的。也就是说,从内存的角度考虑,即有一部分存储数据,一部分管理文件句柄。若是Mapper分片数量为1000,Reduce分片数量为1000,那么总共就须要1000000个小文件。因此就会有不少内存消耗,频繁IO以及GC频繁或者出现内存溢出。

并且Reducer端读取Map端数据时,Mapper有这么多小文件,就须要打开不少网络通道读取,很容易形成Reducer(下一个stage)经过driver去拉取上一个stage数据的时候,说文件找不到,其实不是文件找不到而是程序不响应,由于正在GC.

 

2.2 Sorted-Based Shuffle介绍

为了缓解Shuffle过程产生文件数过多和Writer缓存开销过大的问题,spark引入了相似于hadoop Map-Reduce的shuffle机制。该机制每个ShuffleMapTask不会为后续的任务建立单独的文件,而是会将全部的Task结果写入同一个文件,而且对应生成一个索引文件。之前的数据是放在内存缓存中,等到数据完了再刷到磁盘,如今为了减小内存的使用,在内存不够用的时候,能够将输出溢写到磁盘,结束的时候,再将这些不一样的文件联合内存的数据一块儿进行归并,从而减小内存的使用量。一方面文件数量显著减小,另外一方面减小Writer缓存所占用的内存大小,并且同时避免GC的风险和频率。

 


Sort-Based Shuffle有几种不一样的策略:BypassMergeSortShuffleWriter、SortShuffleWriter和UnasfeSortShuffleWriter。

 

对于BypassMergeSortShuffleWriter,使用这个模式特色:

# 主要用于处理不须要排序和聚合的Shuffle操做,因此数据是直接写入文件,数据量较大的时候,网络I/O和内存负担较重

# 主要适合处理Reducer任务数量比较少的状况下

# 将每个分区写入一个单独的文件,最后将这些文件合并,减小文件数量;可是这种方式须要并发打开多个文件,对内存消耗比较大

 

由于BypassMergeSortShuffleWriter这种方式比SortShuffleWriter更快,因此若是在Reducer数量不大,又不须要在map端聚合和排序,并且

Reducer的数目 <  spark.shuffle.sort.bypassMergeThrshold指定的阀值,就是用的是这种方式。

对于SortShuffleWriter,使用这个模式特色:

# 比较适合数据量很大的场景或者集群规模很大

# 引入了外部外部排序器,能够支持在Map端进行本地聚合或者不聚合

# 若是外部排序器enable了spill功能,若是内存不够,能够先将输出溢写到本地磁盘,最后将内存结果和本地磁盘的溢写文件进行合并

 

对于UnsafeShuffleWriter因为须要谨慎使用,咱们暂不作分析。

 

另外这个Sort-Based ShuffleExecutor核数没有关系,即跟并发度没有关系,它是每个ShuffleMapTask都会产生一个data文件和index文件,所谓合并也只是将该ShuffleMapTask的各个partition对应的分区文件合并到data文件而已。因此这个就须要个Hash-BasedShuffleconsolidation机制区别开来。