spark-yarn模式和shuffle原理

sparkjob的部署
-----------------
    1.client
        driver run on client
    2.cluster
        driver on a worker



4.启动job时,指定资源使用。
    $>spark-submit 
        --driver-memory MEM            //设置driver内存,默认1g,配置2g
        --executor-memory MEM        //控制每一个执行器内存,默认1g

        [只在standalone模式下]
        --driver-cores                //控制driver使用的内核数,默认1.

        [standalone & mesos]
        --total-executor-cores NUM    //控制执行器使用的总内核数

        [standalone & yarn]
        --executor-cores NUM        //控制每一个执行的内核数。
        
        [yarn]
        --driver-cores NUM            //控制driver内核数,默认1
        --num-executors NUM            //启动的执行器个数,动态分配内核启用时,数字就是Num的值。

    
5.启动spark-shell,手动分配资源
    //启动3个executor,worker节点不能启动2个executor
    spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 6g --total-executor-cores 4 --executor-cores 1
    //启动了4个executor,
    spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 3g --total-executor-cores 4 --executor-cores 1
    //启动了7个executor,
    spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 3g --total-executor-cores 22 --executor-cores 3


spark + yarn模式
--------------------
    yarn模式,不须要spark集群,只是在client安装spark,提交做业时,走的是hadoop的流程。
    使用spark的jar,在nodemanager上启动的spark的executor进程。
    --master的值指定yarn便可,rm的地址从配置文件中提取的。

    --master yarn --deployMode client            //--master yarn-client
    --master yarn --deployMode cluster            //--master yarn-cluster
    [yarn-client]
        Appmaster只运行appmaster自身程序,负责资源请求。
        Driver仍然位于client执行。

    [yarn-cluster]
        appmaster不但负责资源请求,还负责运行driver。

    //实操
    1.中止spark集群
        stop-all.sh

    2.启动zk和hdfs-yarn
        start-yarn.sh

    3.配置spark的spark-env.sh的HADOOP_CONF_DIR并分发.
        ...
        export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop
    
    4.启动spark-shell
        spark-shell --master yarn --deploy-mode client --num-executors 4
        
    5.故障诊断
        出现 is running beyond virtual memory limits. 
        Current usage: 178.7 MB of 1 GB physical memory used; 2.3 GB of 2.1 GB virtual memory used. Killing container.

        关闭yarn-site.xml虚拟内存检查并分发文件。
        [yarn-site.xml]
        <property>
            <name>yarn.nodemanager.vmem-check-enabled</name>
            <value>false</value>
        </property>

    6.spark yarn运行时将spark的全部jar上传到hdfs,协同hadoop的做业运行流程。
        配置spark.yarn.jars或者spark.yarn.archive,避免每次上传jar包。
        1.spark.yarn.jars
            spark.yarn.jars=hdfs:///some/path
        2.spark.yarn.archive
            spark.yarn.archive=hdfs://mycluster/user/centos/spark/spark-jars.zip

        3.配置spark.yarn.archive属性,避免每次上传大的jar包。
            a)上传zip文件到hdfs://mycluster/user/centos/spark/spark-jars.zip
            b)配置spark配置文件。
                [spark/conf/spark-default.conf]
                spark.yarn.archive hdfs://mycluster/user/centos/spark/spark-jars.zip
            c)启动shell
                $>spark-shell --master yarn-client

ShuffleMapTask
------------------
    private[spark] class ShuffleMapTask(
        stageId: Int,
        stageAttemptId: Int,
        taskBinary: Broadcast[Array[Byte]],        //(rdd,dep)
        partition: Partition,
        @transient private var locs: Seq[TaskLocation],
        metrics: TaskMetrics,
        localProperties: Properties,
        jobId: Option[Int] = None,
        appId: Option[String] = None,
        appAttemptId: Option[String] = None)
    }

shuffle管理
-------------------
    [ShuffleManager]
        ShuffleManager,是shuffle系统可插拔接口。
        ShuffleManager在driver和每一个executor经过SparkEnv进行建立。
        基于spark.shuffle.manager属性配置建立相应shuffleManager实现。
        在spark 2.1.0中只有SortShuffleManager.
        在spark 1.6.0中有SortShuffleManager和HashShuffleManager.

    [HashShuffleManager]
        spark.shuffle.consolidateFiles=true,默认false,合并输出。
        slot = 并发能力 = 并发执行的线程数 = (执行器个数 * 每一个执行器的cpu内核数) / 每一个任务占用的内核数。

    spark 2.1.0的实现类是SortShuffleManager(不论sort仍是tungsten-sort(钨丝排序))
    [SortShuffleManager]
        基于排序的shuffle,输入kv按照目标分区的id进行排序,而后写入一个map输出文件。
        reducer读取连续文件区域来提取数据。map内存不足,溢出到磁盘,磁盘上的文件最终输出到一个文件中。

        该方式的shuffle有两种途径生成map输出文件:
        1.串行化排序(如下三个条件均知足使用)
            a)shuffle依赖没有指定聚合或者输出排序
            b)shuffle序列化器支持序列化值得从新定位。(当前只有KryoSerializer和SQL的Serializer能够,java不能够)
            c)shuffle生成的分区少于16777216个.

        2.反串行排序
            全部其余状况。
    
    [串行化排序模式]
        该模式下,传递给ShuffleWriter的record便可被串行化,排序时也是串行化进行缓冲。该方式有几点优化
        处理:
        1.对串行化的二进制数据进行排序,而不是针对java对象,所以能够减小内存消耗和过分GC。
          该优化机制要求串行化器具备特殊的属性可以对串行的record进行重排序,不须要反串过程。

        2.使用串行化的具备高效缓存特征的sorter,能够对压缩的record指针和分区id的数组进行排序。
          数组中,每条record使用8字节空间存储。

        3.溢出合并过程对串行化的数据块(属于同一分区)进行操做,而且合并期间不须要反串(流)。

        4.支持压缩文件块的合成,合并过程简单的将压缩和串行化的分区最终合并成一个分区文件,
          支持高效数据复制方式,例如NIO中的零拷贝。
        
        
ShuffleManager.registerShuffle()
-----------------------------------
    //1.经过ShuffleDep判断是否须要bypass
    if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
      new BypassMergeSortShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } 
    //判断依赖是否能够串行shuffle
    else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } 
    //基本shuffle
    else {
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }


    
是否迂回的条件
-------------------------
    def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    //若是map端须要聚合,不能回调。
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    }
    //判断依赖的分区数量是否小于指定的配置(默认时200)
    else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
    }

    //结论
    if(map须要聚合){
        //不能迂回
    }
    else{
        if(分区数 <= 200(可配:spark.shuffle.sort.bypassMergeThreshold)){
            //能够迂回
        }
        else{
            //不能迂回
        }
    }


串行shuffle的判断条件
------------------------
    def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
        val shufId = dependency.shuffleId
        val numPartitions = dependency.partitioner.numPartitions
        //判断是否dep中使用的串行化器是否时kryo(kryo支持)。
        if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
          false
        }
        //判断dep是否认义聚合器
        else if (dependency.aggregator.isDefined) {
          false
        } 
        //分区数大于特定值
        else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
          false
        } 
        ///
        else {
          true
        }
    }
    
    //结论
    if(不是kryo){
        //不能用串行shuffle
    }
    //
    else if(dep定义了聚合器){
        //不能用串行shuffle
    }
    else if(分区数 > (1 << 24) ){
        //不能用串行shuffle
    }
    else{
        //使用串行shuffle
    }


整个shuffle处理手段的优先级
---------------------------
    //1.迂回策略
    if(可否迂回){
        //new BypassMergeSortShuffleHandle()
    }
    //2.串行策略
    else if(是否串行){
        //new SerializedShuffleHandle();
    }
    //3.常规策略
    else{
        //new BaseShuffleHandle
    }


SortShuffleManager.getWrtier()
--------------------------------
    handle match{
        case SerializedShuffleHandle        => new UnsafeShuffleWriter();
        case BypassMergeSortShuffleHandle    => new BypassMergeSortShuffleWriter();
        case BaseShuffleHandle                => new SortShuffleWriter();
    
    }


ShuffleWriter的特性
--------------------
    abstract class ShuffleWriter
         |
        / \
        ---
         |
         |------BypassMergeSortShuffleWriter
         |------UnsafeShuffleWriter
         |------SortShuffleWriter

    [BypassMergeSortShuffleWriter]
        该类实现了hash方式的shuffle处理手段,将record写入单独文件,每一个分区一个文件。
        而后对每一个分区文件合并再产生一个文件,文件的不一样区域用于不一样reduce,该模式下,
        record不在内存中缓存,这是和HashShuffleWriter本质不一样点。

        该方式对于有大量分区的shuffle处理效率不高,缘由是须要对全部分区同时打开串行化器
        和文件流。

    [UnsafeShuffleWriter]
        将kv分开单独以kryo串行写入缓冲区,而后将缓冲放入ShuffleExternalSorter中。
        1.ShuffleExternalSorter
            专门用于基于sort的shuffle。record追加到date page,若是全部record插入
            后或者内存到达limit值,这些记录按照分区id进行排序,排序后的记录写入单独
            的输出文件(或多个文件),输出文件的格式和SortShuffleWriter输出文件格式相同,
            每条分区的记录都是单独串行和压缩写入的,一样使用反串和解压缩方式读取。
            和ExternalSorter不一样,该对象不对溢出文件进行合并,而是将合并过程交给
            UnsafeShuffleWriter,避免多余串行和反串过程。

            KV以串行和压缩方式写缓冲区,再将缓冲区字节数组写入页面内存(long[]),标记好
            长度、偏移量、分区数等等,每一个KV在页面内存的地址和分区进行编码后写入内存
            排序器(InMemorySorter,该排序器使用分区id降序排列).若是内存页默认超过1G(
            能够经过spark.shuffle.spill.numElementsForceSpillThreshold进行修改)个kv,
            发生溢出,进行排序输出到文件。


    [SortShuffleWriter]
        
        



Spark中的串行化
-------------------
    spark默认使用java串行化器,但性能通常,优化手段之一
    使用kryo串行化,可是kryo串行化器对于要串行化的类使用前
    须要注册,spark的kryo串行化器只是对java内置类、scala的内置
    类核spark的内置类进行了注册,自定义的类必须手动注册。
    也是没有把kryo串行化器作为默认设置的缘由.
    
    
    keyo串行化为何快
——————————————————————————————————————————
为何kryo比其它的序列化方案要快?

为每个类分配一个id

实现了本身的IntMap

代码中一些取巧的地方:

利用变量memoizedRegistration和memoizedType记录上一次的调用writeObject函数的Class,则若是两次写入同一类型时,能够直接拿到,再也不查找HashMap。