Spark-Shuffle机制详解

Shuffle机制详解

什么是Shuffle?

shuffle中文翻译为洗牌,需要shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。

 

发生在map方法之后,reduce方法之前。

Shuffle一般包含两阶段任务:

第一阶段:产生shuffle数据的阶段(map阶段)

       补充:是ShuffleManager中的getWriter来写数据(数据可以通过BlockManager写到Memort、Disk、Tackyon等,例如像非常快的shuffle,此时可以考虑把数据写在内存中,但是内存不稳定,建议采用MEMORY_AND_DISK方式)

第二阶段:使用shuffle数据的阶段(reduce阶段)

       补充:是ShuffleManager中的getReader向Driver去获取上一个Stage产生的数据

 

shuffe可能面临的问题?

 

具体运行Task的时候才会产生Shuffle

  1. 数据量非常大
  2. 数据如何分类?(即如何Partition,Hash、Sort、钨丝计划)
  3. 负载均衡(数据倾斜)
  4. 网络传输效率,需要在压缩和加压缩之间做出权衡,序列化和反序列化也是需要考虑的问题

 

Spark的shuffle方式:

HashShuffle

  • Key不能是Array
  • HashShuffle不需要排序,此时理论上就节省了Hadoop MapReduce中进行shuffle需要排序时候的时间浪费,因为实际上生产环境下有大量的不需要排序的shuffle类型。

思考:不需要排序的HashShuffle是否一定比需要排序的SortShuffle速度更快?

不一定!如果数据规模比较小的情况下,HashShuffle会比SortShuffle快很多!但是如果数据量很大,此时SortShuffle一般都会比HashShuffle快很多。

 

1.普通的HashShuffle

每个ShuffleMapTask会根据key的哈希值计算当前key需要写入哪个partition,然后把决定后的结果写入到单独的文件,此时会导致每个Task产生R(指下一个stage的并行度)个文件,如果当前的stage中有M个ShuffleMapTask,则会产生M*R个小文件!注意:HashShuffle绝大多数情况下都要通过网络,如果Mapper和Reduce在同一台机器上,此时只需要读取本地磁盘即可。

这样的shuffle会产生两大死穴:

  1. shuffle之前会产生海量的小文件于磁盘上,此时会产生大量耗时低效的I/O操作;
  2. 内存不够用,由于内存中需要保存海量的文件操句柄和临时缓存信息,如果数据处理规模比较庞大的话,内存不可承受出现OOM等问题。

为此Spark推出了shuffle Pluggable开放框架,方便系统升级的时候定制Shuffle功能模块,也方便第三方系统改造人员根据实际的业务场景来开发具体最佳的shuffle模块;核心接口ShuffleManager,具体实现有HashShuffleManager、SortShuffleManager等。

图解如下:

图片地址:https://www.processon.com/embed/5e9166c55653bb1a685626f4

2.Consolidate优化后的HashShuffle

为了改善上述普通HashShuffle产生的小文件过多问题,Spark后来推出了Consolidate机制,来把小文件合并。为了优化HashShuffleManager我们可以设置一个参数,spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true即可开启优化机制,通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stagetask数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能

此时shuffle产生文件数量为C * R(C为map阶段Task使用的core数量,会根据cpu的个数来确定产生几个小文件),对于ShuffleMapTask的数量明显多于同时可用并行cores的数量情况下,shuffle产生的文件会大幅度减少,会极大降低OOM的可能。但是如果mapper阶段同时能够使用的cores很多或者reduce中所有并行任务数量很多,C*R可能已经过大,还是会出问题。

图解如下:(图解以每个Executor一个core为基础来谈)

图解地址:https://www.processon.com/embed/5e918751f346fb4bdd5eac47

 

SortShuffle(Spark2.1.1版本中只剩下这一种了

Spark在引入sort shuffle)(Spark1.1版本)以前,比较适用于中小规模的大数据处理!
引入了sort shuffle(和钨丝计划)之后,可以做任意hadoop可以做的大规模数据处理!

为什么需要Sort Shuffle?

consolidate只是能缓解HashShuffle中小文件过多可能导致OOM的问题,并没有从根本上解决。为了让Spark在更大规模集群上更高性能的处理更大规模的数据,于是,就引入了SortShuffle。从次以后(Spark1.1版本开始),Spark可以胜任任意规模(包含PB级别及PB以上)的大数据处理,尤其是随着钨丝计划的引入和优化,把Spark更快速的在更大规模集群处理更海量的数据能力推向了一个新的巅峰。

1.BaseShuffleHandle对应的SortShuffleWriter工作机制:

在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。


SortShufle会产生2*M(M代表了ShuffleMapTask的总数量)个shuffle临时文件。

不会为每个reduce中的Task生成一个单独的文件,相反,sortshuffle会把mapper阶段中每个ShuffleMapTask所有的输出数据Data只写到一个文件中,因为每个ShuffleMapTask中的数据需要被分类,所以,sortShuffle使用了index文件存储具体ShuffleMapTask输出数据在一个data文件中如何分类的信息。所以,基于SortShuffle会在mapper阶段中的每一个ShuffleMapTask中产生两个文件:Data文件和Index文件,其中data文件是当前ShuffleMapTask输出的数据文件,index文件中存储了data文件中数据通过Partitioner分类信息,此时,下一个阶段的stage中的task就是根据index文件从上一个stage中所有ShuffleMapTask的data文件获取自己所需要的数据。

在sort shuffle中reducer是如何获取自己所需要的数据的?

Reducer首先会找Driver去获取父Stage中每个ShuffleMapTask输出的位置信息,根据位置信息获取index文件,解析index文件,根据解析的index文件信息获取data文件中数据自己的那部分数据。

之后的版本中,spark默认采用的就是sortshuffle,源码在Sparkenv.class中

默认sort shuffle的缺陷:

如果mapper中的Task的数量过大,依旧会产生很多小文件;此时在shuffle传递数据的过程中到reducer端,reducer会需要同时打开大量的文件来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至奔溃。

图解如下:

图解地址:https://www.processon.com/embed/5e91a6fa5653bb1a68574c8d

2.BypassMergeSortShuffleHandle对应的BypassMergeSortShuffleWriter工作机制:

每个task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

bypass运行机制的触发条件如下:

  1. 不需要预聚合(combine)类的shuffle算子。(如reduceByKey)
  2. shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。(默认为200)

图解如下:

图解地址:https://www.processon.com/embed/5e91c4065653bb1a6857a9b1

3.SerializedShuffleHandle对应的UnsafeShuffleWriter工作机制((钨丝计划)tungsten计划的shuffle,会使用off-heap的内存)

以后详解