不可不知的spark shuffle


640?wx_fmt=png shuffle概览

一个spark的RDD有一组固定的分区组成,每一个分区有一系列的记录组成。对于由窄依赖变换(例如map和filter)返回的RDD,会延续父RDD的分区信息,以pipeline的形式计算。每一个对象仅依赖于父RDD中的单个对象。诸如coalesce之类的操做可能致使任务处理多个输入分区,但转换仍然被认为是窄依赖的,由于一个父RDD的分区只会被一个子RDD分区继承。
网络

Spark还支持宽依赖的转换,例如groupByKey和reduceByKey。在这些依赖项中,计算单个分区中的记录所需的数据能够来自于父数据集的许多分区中。要执行这些转换,具备相同key的全部元组必须最终位于同一分区中,由同一任务处理。为了知足这一要求,Spark产生一个shuffle,它在集群内部传输数据,并产生一个带有一组新分区的新stage。分布式

能够看下面的代码片断:性能

sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()

上面的代码片断只有一个action操做,count,从输入textfile到action通过了三个转换操做。这段代码只会在一个stage中运行,由于,三个转换操做没有shuffle,也便是三个转换操做的每一个分区都是只依赖于它的父RDD的单个分区。大数据

可是,下面的单词统计就跟上面有很大区别:优化

val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
charCounts.collect()

这段代码里有两个reducebykey操做,三个stage。ui

下面图更复杂,由于有一个join操做:spa

640?wx_fmt=png

粉框圈住的就是整个DAG的stage划分。线程

640?wx_fmt=png

在每一个stage的边界,父stage的task会将数据写入磁盘,子stage的task会将数据经过网络读取。因为它们会致使很高的磁盘和网络IO,因此shuffle代价至关高,应该尽可能避免。父stage的数据分区每每和子stage的分区数不一样。触发shuffle的操做算子每每能够指定分区数的,也便是numPartitions表明下个stage会有多少个分区。就像mr任务中reducer的数据是很是重要的一个参数同样,shuffle的时候指定分区数也将在很大程度上决定一个应用程序的性能。3d

640?wx_fmt=png 优化shuffle

一般状况能够选择使用产生相同结果的action和transform相互替换。可是并非产生相同结果的算子就会有相同的性能。一般避免常见的陷阱并选择正确的算子能够显著提升应用程序的性能。
code

当选择转换操做的时候,应最小化shuffle次数和shuffle的数据量。shuffle是很是消耗性能的操做。全部的shuffle数据都会被写入磁盘,而后经过网络传输。repartition , join, cogroup, 和  *By 或者 *ByKey 类型的操做都会产生shuffle。咱们能够对一下几个操做算子进行优化:

1. groupByKey某些状况下能够被reducebykey代替。

2. reduceByKey某些状况下能够被 aggregatebykey代替。

3. flatMap-join-groupBy某些状况下能够被cgroup代替。

具体细节,知识星球球友能够点击阅读原文进入知识星球阅读。

640?wx_fmt=png no shuffle

在某些状况下,前面描述的转换操做不会致使shuffle。当先前的转换操做已经使用了和shuffle相同的分区器分区数据的时候,spark就不会产生shuffle。

举个例子:

rdd1 = someRdd.reduceByKey(...)

rdd2 = someOtherRdd.reduceByKey(...)

rdd3 = rdd1.join(rdd2)

因为使用redcuebykey的时候没有指定分区器,因此都是使用的默认分区器,会致使rdd1和rdd2都采用的是hash分区器。两个reducebykey操做会产生两个shuffle过程。若是,数据集有相同的分区数,执行join操做的时候就不须要进行额外的shuffle。因为数据集的分区相同,所以rdd1的任何单个分区中的key集合只能出如今rdd2的单个分区中。 所以,rdd3的任何单个输出分区的内容仅取决于rdd1中单个分区的内容和rdd2中的单个分区,而且不须要第三个shuffle。

例如,若是someRdd有四个分区,someOtherRdd有两个分区,而reduceByKeys都使用三个分区,运行的任务集以下所示:

640?wx_fmt=png

若是rdd1和rdd2使用不一样的分区器或者相同的分区器不一样的分区数,仅仅一个数据集在join的过程当中须要从新shuffle

640?wx_fmt=png


在join的过程当中为了不shuffle,可使用广播变量。当executor内存能够存储数据集,在driver端能够将其加载到一个hash表中,而后广播到executor。而后,map转换能够引用哈希表来执行查找。

640?wx_fmt=png 增长shuffle

有时候须要打破最小化shuffle次数的规则。

当增长并行度的时候,额外的shuffle是有利的。例如,数据中有一些文件是不可分割的,那么该大文件对应的分区就会有大量的记录,而不是说将数据分散到尽量多的分区内部来使用全部已经申请cpu。在这种状况下,使用reparition从新产生更多的分区数,以知足后面转换算子所需的并行度,这会提高很大性能。

使用reduce和aggregate操做将数据聚合到driver端,也是修改区数的很好的例子。

在对大量分区执行聚合的时候,在driver的单线程中聚合会成为瓶颈。要减driver的负载,能够首先使用reducebykey或者aggregatebykey执行一轮分布式聚合,同时将结果数据集分区数减小。实际思路是首先在每一个分区内部进行初步聚合,同时减小分区数,而后再将聚合的结果发到driver端实现最终聚合。典型的操做是treeReduce 和 treeAggregate。

当聚合已经按照key进行分组时,此方法特别适用。例如,假如一个程序计算语料库中每一个单词出现的次数,并将结果使用map返回到driver。一种方法是可使用聚合操做完成在每一个分区计算局部map,而后在driver中合并map。能够用aggregateByKey以彻底分布的方式进行统计,而后简单的用collectAsMap将结果返回到driver。

更多spark技巧,大数据技巧,欢迎点击阅读原文加入知识星球

推荐阅读:

经验|如何设置Spark资源

戳破 | hive on spark 调优势

640?wx_fmt=png