Spark Shuffle之SortShuffleManager机制原理

1. SparkShuffle 概念

reduceByKey 会将上一个 RDD 中的每一个 key 对应的所有 value 聚合
成一个 value,然后生成一个新的 RDD,元素类型是<key,value>对的
形式,这样每一个 key 对应一个聚合起来的 value。
问题:聚合之前,每一个 key 对应的 value 不一定都是在一个
partition 中,也不太可能在同一个节点上,因为 RDD 是分布式的弹性
的数据集,RDD 的 partition 极有可能分布在各个节点上。
如何聚合?
– Shuffle Write:上一个 stage 的每个 map task 就必须保证将自己
处理的当前分区的数据相同的 key 写入一个分区文件中,可能会写入多
个不同的分区文件中。
– Shuffle Read:reduce task 就会从上一个 stage 的所有 task 所在
的机器上寻找属于己的那些分区文件,这样就可以保证每一个 key 所对
应的 value 都会汇聚到同一个节点上去处理和聚合。
Spark 中有两种 Shuffle 管理类型,HashShufflManager 和
SortShuffleManager,Spark1.2 之前是 HashShuffleManager,
Spark1.2 引入 SortShuffleManager,在 Spark 2.0+版本中已经将
HashShuffleManager 丢弃。

1) 普通机制

普通机制示意图:
在这里插入图片描述
执行流程
a) map task 的计算结果会写入到一个内存数据结构里面,内存
数据结构默认是 5M
b) 在 shuffle 的时候会有一个定时器,不定期的去估算这个内存
结构的大小,当内存结构中的数据超过 5M 时,比如现在内
存结构中的数据为 5.01M,那么他会申请 5.012-5=5.02M
内存给内存数据结构。
c) 如果申请成功不会进行溢写,如果申请不成功,这时候会发
生溢写磁盘。
d) 在溢写之前内存结构中的数据会进行排序分区
e) 然后开始溢写磁盘,写磁盘是以 batch 的形式去写,一个
batch 是 1 万条数据,
f) map task 执行完成后,会将这些磁盘小文件合并成一个大的
磁盘文件,同时生成一个索引文件。
g) reduce task 去 map 端拉取数据的时候,首先解析索引文件,
根据索引文件再去拉取对应的数据。
总结
产生磁盘小文件的个数: 2
M(map task 的个数)

2) bypass 机制

bypass 机制示意图:
在这里插入图片描述
总结 1 .bypass 运行机制的触发条件如下: 算子不能有 map 端的预聚合操作。 shuffle reduce task 的数量小于 spark.shuffle.sort.bypassMergeThreshold 的参数值。这 个值默认是 200。 2 .产生的磁盘小文件为:2*M(map task 的个数)