spark shuffle原理

1.spark中窄依赖的时候不须要shuffle,只有宽依赖的时候须要shuffle,mapreduce中map到reduce必须通过shufflegit

2.spark中的shuffle fetch的时候进行merge操做利用aggregator来进行,其实是个hashmap,放在内存中github

1 // Map: "cat" -> c, cat
2 val rdd1 = rdd.Map(x => (x.charAt(0), x))
3 // groupby same key and count
4 val rdd2 = rdd1.groupBy(x => x._1).
5                 Map(x => (x._1, x._2.toList.length))

第一个 Map 操做将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,能够在集群的各个内存中独立计算,也就是并行化,第二个 groupby 以后的 Map 操做,为了计算相同 key 下的元素个数,须要把相同 key 的元素汇集到同一个 partition 下,因此形成了数据在内存中的从新分布,即 shuffle 操做.shuffle 操做是 spark 中最耗时的操做,应尽可能避免没必要要的 shuffle算法

  宽依赖主要有两个过程: shuffle write 和 shuffle fetch. 相似 Hadoop 的 Map 和 Reduce 阶段.shuffle write 将 ShuffleMapTask 任务产生的中间结果缓存到内存中, shuffle fetch 得到 ShuffleMapTask 缓存的中间结果进行 ShuffleReduceTask 计算,这个过程容易形成OutOfMemory
  shuffle 过程内存分配使用 ShuffleMemoryManager 类管理,会针对每一个 Task 分配内存,Task 任务完成后经过 Executor 释放空间.这里能够把 Task 理解成不一样 key 的数据对应一个 Task. 早期的内存分配机制使用公平分配,即不一样 Task 分配的内存是同样的,可是这样容易形成内存需求过多的 Task 的 OutOfMemory, 从而形成多余的 磁盘 IO 过程,影响总体的效率.(例:某一个 key 下的数据明显偏多,但由于你们内存都同样,这一个 key 的数据就容易 OutOfMemory).1.5版之后 Task 共用一个内存池,内存池的大小默认为 JVM 最大运行时内存容量的16%,分配机制以下:假若有 N 个 Task,ShuffleMemoryManager 保证每一个 Task 溢出以前至少能够申请到1/2N 内存,且至多申请到1/N,N 为当前活动的 shuffle Task 数,由于N 是一直变化的,因此 manager 会一直追踪 Task 数的变化,从新计算队列中的1/N 和1/2N.可是这样仍然容易形成内存须要多的 Task 任务溢出,因此最近有不少相关的研究是针对 shuffle 过程内存优化的.apache

spark shuffle process

早期的shuffle write有两个比较大的问题:缓存

  1. Map的输出必须先所有存储到内存中,而后写入磁盘。这对内存是一个很是大的开销,当内存不足以存储全部的Map output时就会出现OOM。
  2. 每个Mapper都会产生Reducer number个shuffle文件,若是Mapper个数是1k,Reducer个数也是1k,那么就会产生1M个shuffle文件,这对于文件系统是一个很是大的负担。同时在shuffle数据量不大而shuffle文件又很是多的状况下,随机写也会严重下降IO的性能。

Spark 0.8显著减小了shuffle的内存压力,如今Map output不须要先所有存储在内存中,再flush到硬盘,而是record-by-record写入到磁盘中。网络

为了解决shuffle文件过多的状况,Spark 0.8.1引入了新的shuffle consolidation,以期显著减小shuffle文件的数量。app

spark shuffle  consolidation process

假定该job有4个Mapper和4个Reducer,有2个core,也就是能并行运行两个task。咱们能够算出Spark的shuffle write共须要16个bucket,也就有了16个write handler。在以前的Spark版本中,每个bucket对应的是一个文件,所以在这里会产生16个shuffle文件。框架

而在shuffle consolidation中每个bucket并不是对应一个文件,而是对应文件中的一个segment,同时shuffle consolidation所产生的shuffle文件数量与Spark core的个数也有关系。在上面的图例中,job的4个Mapper分为两批运行,在第一批2个Mapper运行时会申请8个bucket,产生8个shuffle文件;而在第二批Mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到以前的8个文件后面,这样一共就只有8个shuffle文件,而在文件内部这有16个不一样的segment。所以从理论上讲shuffle consolidation所产生的shuffle文件数量为C×R,其中C是Spark集群的core number,R是Reducer的个数。socket

须要注意的是当 M=C时shuffle consolidation所产生的文件数和以前的实现是同样的。oop

Shuffle consolidation显著减小了shuffle文件的数量,解决了以前版本一个比较严重的问题,可是writer handler的buffer开销过大依然没有减小,若要减小writer handler的buffer开销,咱们只能减小Reducer的数量,可是这又会引入新的问题,下文将会有详细介绍。

Shuffle Fetch and Aggregator

Shuffle write写出去的数据要被Reducer使用,就须要shuffle fetcher将所需的数据fetch过来,这里的fetch包括本地和远端,由于shuffle数据有可能一部分是存储在本地的。Spark对shuffle fetcher实现了两套不一样的框架:NIO经过socket链接去fetch数据;OIO经过netty server去fetch数据。分别对应的类是BasicBlockFetcherIteratorNettyBlockFetcherIterator

在Spark 0.7和更早的版本中,只支持BasicBlockFetcherIterator,而BasicBlockFetcherIterator在shuffle数据量比较大的状况下performance始终不是很好,没法充分利用网络带宽,为了解决这个问题,添加了新的shuffle fetcher来试图取得更好的性能。对于早期shuffle性能的评测能够参看Spark usergroup。固然如今BasicBlockFetcherIterator的性能也已经好了不少,使用的时候能够对这两种实现都进行测试比较。

接下来讲一下aggregator。咱们都知道在Hadoop MapReduce的shuffle过程当中,shuffle fetch过来的数据会进行merge sort,使得相同key下的不一样value按序归并到一块儿供Reducer使用,这个过程能够参看下图:

mapreduce shuffle process

全部的merge sort都是在磁盘上进行的,有效地控制了内存的使用,可是代价是更多的磁盘IO。

那么Spark是否也有merge sort呢,仍是以别的方式实现,下面咱们就细细说明。

首先虽然Spark属于MapReduce体系,可是对传统的MapReduce算法进行了必定的改变。Spark假定在大多数用户的case中,shuffle数据的sort不是必须的,好比word count,强制地进行排序只会使性能变差,所以Spark并不在Reducer端作merge sort。既然没有merge sort那Spark是如何进行reduce的呢?这就要说到aggregator了。

aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。当咱们在作word count reduce计算count值的时候,它会将shuffle fetch到的每个key-value pair更新或是插入到hashmap中(若在hashmap中没有查找到,则插入其中;若查找到则更新value值)。这样就不须要预先把全部的key-value进行merge sort,而是来一个处理一个,省下了外部排序这一步骤。但同时须要注意的是reducer的内存必须足以存放这个partition的全部key和count值,所以对内存有必定的要求。

在上面word count的例子中,由于value会不断地更新,而不须要将其所有记录在内存中,所以内存的使用仍是比较少的。考虑一下若是是group by key这样的操做,Reducer须要获得key对应的全部value。在Hadoop MapReduce中,因为有了merge sort,所以给予Reducer的数据已是group by key了,而Spark没有这一步,所以须要将key和对应的value所有存放在hashmap中,并将value合并成一个array。能够想象为了可以存放全部数据,用户必须确保每个partition足够小到内存可以容纳,这对于内存是一个很是严峻的考验。所以Spark文档中建议用户涉及到这类操做的时候尽可能增长partition,也就是增长Mapper和Reducer的数量。

增长Mapper和Reducer的数量当然能够减少partition的大小,使得内存能够容纳这个partition。可是咱们在shuffle write中提到,bucket和对应于bucket的write handler是由Mapper和Reducer的数量决定的,task越多,bucket就会增长的更多,由此带来write handler所需的buffer也会更多。在一方面咱们为了减小内存的使用采起了增长task数量的策略,另外一方面task数量增多又会带来buffer开销更大的问题,所以陷入了内存使用的两难境地。

为了减小内存的使用,只能将aggregator的操做从内存移到磁盘上进行,Spark社区也意识到了Spark在处理数据规模远远大于内存大小时所带来的问题。所以PR303提供了外部排序的实现方案,相信在Spark 0.9 release的时候,这个patch应该能merge进去,到时候内存的使用量能够显著地减小。