MapReduce Shuffle 和 Spark Shuffle 原理概述

Shuffle简介

Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽可能打乱成无规则的数据。而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具备必定规则的数据,以便reduce端接收处理。其在MapReduce中所处的工做阶段是map输出后到reduce接收前,具体能够分为map端和reduce端先后两个部分。编程

在shuffle以前,也就是在map阶段,MapReduce会对要处理的数据进行分片(split)操做,为每个分片分配一个MapTask任务。接下来map会对每个分片中的每一行数据进行处理获得键值对(key,value)此时获得的键值对又叫作“中间结果”。此后便进入reduce阶段,由此能够看出Shuffle阶段的做用是处理“中间结果”。数组

因为Shuffle涉及到了磁盘的读写和网络的传输,所以Shuffle性能的高低直接影响到了整个程序的运行效率。网络

MapReduce Shuffle

Hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工做是从Map结束到Reduce开始之间的过程。shuffle阶段又能够分为Map端的shuffle和Reduce端的shuffle。数据结构

Map端的shuffle

下图是MapReduce Shuffle的官方流程:
app

由于频繁的磁盘I/O操做会严重的下降效率,所以“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”,在写入的过程当中进行分区(partition),也就是对于每一个键值对来讲,都增长了一个partition属性值,而后连同键值对一块儿序列化成字节数组写入到缓冲区(缓冲区采用的就是字节数组,默认大小为100M)。oop

当写入的数据量达到预先设置的阙值后便会启动溢写出线程将缓冲区中的那部分数据溢出写(spill)到磁盘的临时文件中,并在写入前根据key进行排序(sort)和合并(combine,可选操做)。性能

溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性指定的本地目录中。当整个map任务完成溢出写后,会对磁盘中这个map任务产生的全部临时文件(spill文件)进行归并(merge)操做生成最终的正式输出文件,此时的归并是将全部spill文件中的相同partition合并到一块儿,并对各个partition中的数据再进行一次排序(sort),生成key和对应的value-list,文件归并时,若是溢写文件数量超过参数min.num.spills.for.combine的值(默认为3)时,能够再次进行合并。优化

至此map端的工做已经所有结束,最终生成的文件也会存储在TaskTracker可以访问的位置。每一个reduce task不间断的经过RPC从JobTracker那里获取map task是否完成的信息,若是获得的信息是map task已经完成,那么Shuffle的后半段开始启动。spa

Reduce端的shuffle

当mapreduce任务提交后,reduce task就不断经过RPC从JobTracker那里获取map task是否完成的信息,若是获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程就开始启动。Reduce端的shuffle主要包括三个阶段,copy、merge和reduce。
线程

每一个reduce task负责处理一个分区的文件,如下是reduce task的处理流程:

  1. reduce task从每一个map task的结果文件中拉取对应分区的数据。由于数据在map阶段已是分好区了,而且会有一个额外的索引文件记录每一个分区的起始偏移量。因此reduce task取数的时候直接根据偏移量去拉取数据就ok。

  2. reduce task从每一个map task拉取分区数据的时候会进行再次合并,排序,按照自定义的reducer的逻辑代码去处理。

  3. 最后就是Reduce过程了,在这个过程当中产生了最终的输出结果,并将其写到HDFS上。

为何要排序

  1. key存在combine操做,排序以后相同的key放到一块显然方便作合并操做。

  2. reduce task是按key去处理数据的。 若是没有排序那必须从全部数据中把当前相同key的全部value数据拿出来,而后进行reduce逻辑处理。显然每一个key到这个逻辑都须要作一次全量数据扫描,影响性能,有了排序很方便的获得一个key对于的value集合。

  3. reduce task按key去处理数据时,若是key按顺序排序,那么reduce task就按key顺序去读取,显然当读到的key是文件末尾的key那么就标志数据处理完毕。若是没有排序那还得有其余逻辑来记录哪些key处理完了,哪些key没有处理完。

虽有千万种理由须要这么作,可是很耗资源,而且像排序其实咱们有些业务并不须要排序。

为何要文件合并

  1. 由于内存放不下就会溢写文件,就会发生屡次溢写,造成不少小文件,若是不合并,显然会小文件泛滥,集群须要资源开销去管理这些小文件数据。

  2. 任务去读取文件的数增多,打开的文件句柄数也会增多。

  3. mapreduce是全局有序。单个文件有序,不表明全局有序,只有把小文件合并一块儿排序才会全局有序。

    Spark的Shuffle

    Spark的Shuffle是在MapReduce Shuffle基础上进行的调优。其实就是对排序、合并逻辑作了一些优化。在Spark中Shuffle write至关于MapReduce 的map,Shuffle read至关于MapReduce 的reduce。

Spark丰富了任务类型,有些任务之间数据流转不须要经过Shuffle,可是有些任务之间仍是须要经过Shuffle来传递数据,好比宽依赖的group by key以及各类by key算子。宽依赖之间会划分stage,而Stage之间就是Shuffle,以下图中的stage0,stage1和stage3之间就会产生Shuffle。

在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,所以spark的Shuffle有Hash Shuffle和Sort Shuffle两种。

Spark Shuffle发展史
Spark 0.8及之前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改成Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台

在Spark的版本的发展,ShuffleManager在不断迭代,变得愈来愈先进。
在Spark 1.2之前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个很是严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操做影响了性能。所以在Spark 1.2之后的版本中,默认的ShuffleManager改为了SortShuffleManager。

SortShuffleManager相较于HashShuffleManager来讲,有了必定的改进。主要就在于,每一个Task在进行shuffle操做时,虽然也会产生较多的临时磁盘文件,可是最后会将全部的临时文件合并(merge)成一个磁盘文件,所以每一个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取本身的数据时,只要根据索引读取每一个磁盘文件中的部分数据便可。

Hash Shuffle

HashShuffleManager的运行机制主要分红两种,一种是普通运行机制,另外一种是合并的运行机制。合并机制主要是经过复用buffer来优化Shuffle过程当中产生的小文件的数量。Hash shuffle是不具备排序的Shuffle。

普通机制的Hash Shuffle

最开始使用的Hash Based Shuffle,每一个Mapper会根据Reducer的数量建立对应的bucket,bucket的数量是M * R,M是map的数量,R是Reduce的数量。
以下图所示:2个core 4个map task 3 个reduce task,会产生4*3=12个小文件。

优化后的Hash Shuffle

普通机制Hash Shuffle会产生大量的小文件(M * R),对文件系统的压力也很大,也不利于IO的吞吐量,后来作了优化(设置spark.shuffle.consolidateFiles=true开启,默认false),把在同一个core上的多个Mapper输出到同一个文件,这样文件数就变成core * R 个了。
以下图所示:2个core 4个map task 3 个reduce task,会产生2*3=6个小文件。

Hash shuffle合并机制的问题:
若是 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生不少小文件。进而引出了更优化的sort shuffle。
在Spark 1.2之后的版本中,默认的ShuffleManager改为了SortShuffleManager。

Sort Shuffle

SortShuffleManager的运行机制主要分红两种,一种是普通运行机制,另外一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

普通机制的Sort Shuffle

这种机制和mapreduce差很少,在该模式下,数据会先写入一个内存数据结构中,此时根据不一样的shuffle算子,可能选用不一样的数据结构。若是是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边经过Map进行聚合,一边写入内存;若是是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构以后,就会判断一下,是否达到了某个临界阈值。若是达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。

在溢写到磁盘文件以前,会先根据key对内存数据结构中已有的数据进行排序。排序事后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。
一个task将全部数据写入内存数据结构的过程当中,会发生屡次磁盘溢写操做,也会产生多个临时文件。最后会将以前全部的临时磁盘文件都进行合并,因为一个task就只对应一个磁盘文件所以还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager因为有一个磁盘文件merge的过程,所以大大减小了文件数量,因为每一个task最终只有一个磁盘文件因此文件个数等于上游shuffle write个数。

bypass机制的Sort Shuffle

bypass运行机制的触发条件以下:
1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认值200。
2)不是聚合类的shuffle算子(好比reduceByKey)。

此时task会为每一个reduce端的task都建立一个临时磁盘文件,并将数据按key进行hash而后根据key的hash值,将key写入对应的磁盘文件之中。固然,写入磁盘文件时也是先写入内存缓冲,缓冲写满以后再溢写到磁盘文件的。最后,一样会将全部临时磁盘文件都合并成一个磁盘文件,并建立一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是如出一辙的,由于都要建立数量惊人的磁盘文件,只是在最后会作一个磁盘文件的合并而已。所以少许的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来讲,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不一样在于:
第一,磁盘写机制不一样;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程当中,不须要进行数据的排序操做,也就节省掉了这部分的性能开销。

Spark Shuffle总结

Shuffle 过程本质上都是将 Map 端得到的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。

Shuffle做为处理链接map端和reduce端的枢纽,其shuffle的性能高低直接影响了整个程序的性能和吞吐量。map端的shuffle通常为shuffle的Write阶段,reduce端的shuffle通常为shuffle的read阶段。Hadoop和spark的shuffle在实现上面存在很大的不一样,spark的shuffle分为两种实现,分别为HashShuffle和SortShuffle。

HashShuffle又分为普通机制和合并机制,普通机制由于其会产生MR个数的巨量磁盘小文件而产生大量性能低下的Io操做,从而性能较低,由于其巨量的磁盘小文件还可能致使OOM,HashShuffle的合并机制经过重复利用buffer从而将磁盘小文件的数量下降到CoreR个,可是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。

SortShuffle也分为普通机制和bypass机制,普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(好比reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提升了其性能。

在Spark 1.2之前,默认的shuffle计算引擎是HashShuffleManager,由于HashShuffleManager会产生大量的磁盘小文件而性能低下,在Spark 1.2之后的版本中,默认的ShuffleManager改为了SortShuffleManager。

SortShuffleManager相较于HashShuffleManager来讲,有了必定的改进。主要就在于,每一个Task在进行shuffle操做时,虽然也会产生较多的临时磁盘文件,可是最后会将全部的临时文件合并(merge)成一个磁盘文件,所以每一个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取本身的数据时,只要根据索引读取每一个磁盘文件中的部分数据便可。

Spark与MapReduce Shuffle的异同

  1. 从总体功能上看,二者并无大的差异。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不一样的 partition 送到不一样的 reducer(Spark 里 reducer 多是下一个 stage 里的 ShuffleMapTask,也多是 ResultTask)。Reducer 之内存做缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好之后进行 reduce(Spark 里多是后续的一系列操做)。

  2. 从流程的上看,二者差异不小。 Hadoop MapReduce 是 sort-based,进入 combine和 reduce的 records 必须先 sort。这样的好处在于 combine/reduce能够处理大规模的数据,由于其输入数据能够经过外排获得(mapper 对每段数据先作排序,reducer 的 shuffle 对排好序的每段数据作归并)。之前 Spark 默认选择的是 hash-based,一般使用 HashMap 来对 shuffle 来的数据进行合并,不会对数据进行提早排序。若是用户须要通过排序的数据,那么须要本身调用相似 sortByKey的操做。在Spark 1.2以后,sort-based变为默认的Shuffle实现。

  3. 从流程实现角度来看,二者也有很多差异。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map, spill, merge, shuffle, sort, reduce等。每一个阶段各司其职,能够按照过程式的编程思想来逐一实现每一个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不一样的 stage 和一系列的 transformation,因此 spill, merge, aggregate 等操做须要蕴含在 transformation中。