【Spark】Spark 存储原理--shuffle 过程

本篇结构:

  • Spark Shuffle 的发展
  • Spark Shuffle 中数据结构
  • Spark Shuffle 原理
  • 后记

Spark Shuffle 是 spark job 中某些算子触发的操作。当 rdd 依赖中出现宽依赖的时候,就会触发 Shuffle 操作,Shuffle 操作通常会伴随着不同 executor/host 之间数据的传输。

Shuffle 操作可能涉及的过程包括数据的排序,聚合,溢写,合并,传输,磁盘IO,网络的 IO 等等。Shuffle 是连接 MapTask 和 ReduceTask 之间的桥梁,Map 的输出到 Reduce 中须经过 Shuffle 环节,Shuffle 的性能高低直接影响了整个程序的性能和吞吐量。

通常 Shuffle 分为两部分:Map 阶段的数据准备( ShuffleMapTask )和Reduce(ShuffleReduceTask) 阶段的数据拷贝处理。一般将在 Map 端的 Shuffle 称之为 Shuffle Write,在 Reduce 端的 Shuffle 称之为 Shuffle Read。

一、Spark Shuffle 的发展

Spark Shuffle 机制总共有三种:

1.1、未优化的 HashShuffle

每一个 ShuffleMapTask 都会为每一个 ReducerTask 创建一个单独的文件,总的文件数是 M * R,其中 M 是 ShuffleMapTask 的数量,R 是 ShuffleReduceTask 的数量。

见下图(来源网络):

在处理大数据时,ShuffleMapTask 和 ShuffleReduceTask 的数量很多,创建的磁盘文件数量 M*R 也越多,大量的文件要写磁盘,再从磁盘读出来,不仅会占用大量的时间,而且每个磁盘文件记录的句柄都会保存在内存中(每个人大约 100k),因此也会占用很大的内存空间,频繁的打开和关闭文件,会导致频繁的GC操作,很容易出现 OOM 的情况。

也正是上述原因,该 HashShuffle 如今已退出历史舞台。

1.2、优化后 HashShuffle

在 Spark 0.8.1 版本中,引入了 Consolidation 机制,该机制是对 HashShuffle 的一种优化。

如下图(来源网络):

可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。

先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i’,每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。

这样,每个 worker 持有的文件数降为 cores * R。cores 代表核数,R 是 ShuffleReduceTask 数。

1.3、Sort-Based Shuffle

由于 HashShuffle 会产生很多的磁盘文件,引入 Consolidation 机制虽然在一定程度上减少了磁盘文件数量,但是不足以有效提高 Shuffle 的性能,适合中小型数据规模的大数据处理。

为了让 Spark 在更大规模的集群上更高性能处理更大规模的数据,因此在 Spark 1.1版本中,引入了 SortShuffle。

如下图(来源网络):

该机制每一个 ShuffleMapTask 都只创建一个文件,将所有的 ShuffleReduceTask 的输入都写入同一个文件,并且对应生成一个索引文件。

以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer 缓存所占用的内存大小,而且同时避免 GC 的风险和频率。

二、Spark Shuffle 中数据结构

2.1、AppendOnlyMap

AppendOnlyMap 单从命名上来看,是一个只能追加元素的 Map 结构。的确,它是只支持追加的 map,可以修改某个 key 对应的 value,但是不能删除已经存在的 key。

底层是由数组结构实现的,当需要对 Key-Value 进行聚合时,会使用AppendOnlyMap 作为 buffer。在插入或者修改元素的时候,会判断是否扩容,如果达到扩容标准,将会对数组 2 倍容量进行扩容,扩容过程中原有元素并不是直接拷贝,而是进行原有元素的重新定位存储,如果集合中存在的数据量大,那么这里的操作将会耗时又耗资源。

存储级别是 Memory-Only ,在 shuffle reduce 数据不会溢写,在数据量不大的情况下可以,但是数据量大时,会极易出现OOM。

2.2、ExternalAppendOnlyMap

继承于AppendOnlyMap ,但是存储级别是 Memory and Disk,即在数据量达到一个阈值的时候,会把数据溢写到磁盘,达到释放内存空间,降低 OOM 的风险的作用。

2.3、PartitionedAppendOnlyMap

是 SortShuffleWriter 中用到的一种数据结构,在 Map 端需要聚合的时候,采用这种数据结构,这种结构也是一种 Hash Table,能够根据 Key,通过 hash(Key),把数据插入到相应的位置。

PartitionedAppendOnlyMap 支持 aggregation,它继承了 SizeTrackingAppendOnlyMap,还实现了 WritablePartitionedPairCollection 接口中的 partitionedDestructiveSortedIterator 抽象方法,在该方法中调用了AppendOnlyMap 的 destructiveSortedIterator 对底层数组进行整理和排序后获得迭代器,数据有可能不会连续存放。

2.4、PartitionedPairBuffer

底层实现和 PartitionedAppendOnlyMap 一样都是 ArrayBuffer,主要用于SortShuffleWriter 中 Map 端不采用聚合和排序时使用。

不支持 aggregation,主要功能是插入值是有顺序的,主要起缓冲作用,只有顺序插入,没有 changeValue(聚合)和 update(更新)操作,数据连续存放在尾部。

三、Spark Shuffle 原理

因为 hash based shuffle 已经退出历史舞台,所以以 spark 2.3 的 sort based shuffle 为例,看 Spark Shuffle 的原理。

Shuffle 的整个生命周期由 ShuffleManager 来管理,Spark 2.3中,唯一的支持方式为 SortShuffleManager,SortShuffleManager 中定义了 writer 和 reader 对应shuffle 的 map 和 reduce 阶段。writer 有三种运行模式:

  • BypassMergeSortShuffleWriter
  • SortShuffleWriter
  • UnsafeShuffleWriter

3.1、BypassMergeSortShuffleWriter

首先,BypassMergeSortShuffleWriter 的运行机制的触发条件如下:

  • shuffle reduce task(即partition)数量小于spark.shuffle.sort.bypassMergeThreshold 参数的值。
  • 没有map side aggregations。
    note: map side aggregations是指在 map 端的聚合操作,通常来说一些聚合类的算子都会都 map 端的 aggregation。不过对于 groupByKey 和combineByKey, 如果设定 mapSideCombine 为false,就不会有 map side aggregations。

图片来源网络(正确的应该有三个 ReduceTask):

每个 task 会为每一个下游的 reduce task 创建一个临时文件,将 key 按照 hash 存入对应临时文件中,因为写入磁盘文件是通过 Java的 BufferedOutputStream 实现的,BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。所以图中会有内存缓冲的概念。最后,会将所有临时文件合并成一个磁盘文件,并创建一个索引文件标识下游各个 reduce task 的数据在文件中的 start offset与 end offset。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一样的,也会创建很多的临时文件(所以触发条件中会有 reduce task 数量限制),只是在最后会做一个磁盘文件的合并,对于 shuffle reader 会更友好一些。

3.2、SortShuffleWriter

图片来源网络:

该模式下,数据首先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。、

接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件也是通过 Java 的 BufferedOutputStream 实现的。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是 merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。

此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

BypassMergeSortShuffleWriter 与该机制相比:

第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用 BypassMerge 机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销,当然需要满足那两个触发条件。

3.3、UnsafeShuffleWriter

触发条件有三个:

  • Serializer 支持 relocation。这是指 Serializer 可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致(目前只能使用 kryoSerializer)。
  • 没有 map side aggregations
  • shuffle reduce task(即 partition )数量不能大于支持的上限(2^24)

UnsafeShuffleWriter 将 record 序列化后插入 sorter,然后对已经序列化的 record 进行排序,并在排序完成后写入磁盘文件作为 spill file,再将多个 spill file 合并成一个输出文件。在合并时会基于 spill file 的数量和 IO compression codec 选择最合适的合并策略。

四、后记

这篇文章东拼西凑,大多不是自己的东西,惭愧。

详见:

https://zhangchenchen.github.io/2018/09/26/deep-in-spark-shuffle/