spark-2.0原理分析-shuffle过程

shuffle概览

shuffle过程概览

这里写图片描述

shuffle数据流概览

这里写图片描述

shuffle数据流

这里写图片描述

shuffle工做流程

在运行job时,spark是一个stage一个stage执行的。先把任务分红stage,在任务提交阶段会把任务造成taskset,在执行任务。
spark的DAGScheduler根据RDD的ShuffleDependency来构建Stages:web

  • 例如:ShuffleRDD/CoGroupedRDD有一个ShuffleDependency。
  • 不少操做经过钩子函数来建立ShuffleRDD

每一个ShuffleDependency会map到spark的job的一个stage,而后会致使一个shuffle过程。app

为何shuffle过程代价很大

这是因为shuffle过程可能须要完成如下过程:ide

  • 从新进行数据分区
  • 数据传输
  • 数据压缩
  • 磁盘I/O

shuffle的体系结构

ShuffleManager接口

    shuffleManager是spark的shuffle系统的可插拔接口。ShuffleManager将会在driver和每一个executor上的SparkEnv中进行建立。能够经过参数spark.shuffle.manager进行设置。
driver经过ShuffleManager来注册shuffle,而且executor经过它来读取和写入数据。svg

ShuffleWriter

控制shuffle数据输出逻辑。函数

ShuffleReader

获取shuffle过程当中用于ShuffleRDD的数据。spa

ShuffleBlockManager

管理抽象的bucket和计算数据块之间的mapping过程。xml

基于sort的shuffle

sort-based的shuffle,会把输入的记录根据目标分区id(partition ids)进行排序。而后写入单个的map输出文件中。为了读取map的输出部分,Reducers获取此文件的连续区域 。当map输出的数据太大而内存没法存放时,输出的排序子集能够保存到磁盘,这些磁盘文件被合并后,生成最终的输出文件。
sort shuffle有两个不一样的输出路径来产生map的输出文件:blog

  • 序列化排序(Serialized sorting)
    在使用序列化排序时,须要知足如下3个条件:
    • shuffle不指定聚合(aggregation)或输出排序方法。
    • shuffle的序列化程序支持序列化值的重定位(KryoSerializer和Spark SQL的自定义序列化程序目前支持此操做)。
    • shuffle产生小于16777216个输出分区。
  • 反序列化排序(Deserialized sorting)
    用来处理全部其余状况。

Sort Shuffle Manager

Sort Shuffle Writer

  • 每一个map任务都会产生一个shuffle数据文件,和一个Index文件
    • 经过外部排序类ExternalSorter对数据进行排序
  • 若map-side须要进行合并(combine)操做,数据将会按key和分区进行排序,若没有合并操做数据只会根据分区进行排序。

这里写图片描述

Sort Shuffle Reader

(待续)排序