Spark2.x精通:Job触发流程源码深度剖析(二)

 接上篇文章:Spark2.x精通:Job触发流程源码深度剖析(一),咱们这里继续讲解如何进行task建立、task最佳位置计算和task提交,接上篇文章的submitMissingTasks()函数作详细讲解。
缓存


  1. 直接看submitMissingTasks()函数如何进行task的建立,这个函数主要干了如下几件事:app

1).task最佳位置的计算
ide

2).序列化task数据,根据不一样类型进行广播,Executor计算时会进行反序列化
函数

3).根据不一样stage类型分别建立ShuffleMapTask和ResultTask,封装成对应的TaskSet,提交给TaskScheduler进行处理
oop


代码以下:post

  /** Called when stage's parents are available and we can now do its task. */  //这里建立一批task,这里task数量和partition数据相同  private def submitMissingTasks(stage: Stage, jobId: Int) {    logDebug("submitMissingTasks(" + stage + ")")
   // First figure out the indexes of partition ids to compute.    //调用findMissingPartitions须要计算的partition    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
   // Use the scheduling pool, job group, description, etc. from an ActiveJob associated    // with this Stage    val properties = jobIdToActiveJob(jobId).properties    //将stage加入running队列    runningStages += stage    // SparkListenerStageSubmitted should be posted before testing whether tasks are    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted    // event.    stage match {      case s: ShuffleMapStage =>        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)      case s: ResultStage =>        outputCommitCoordinator.stageStart(          stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)    }    //这里根据parition的id进行task的建立,一个partition一个task    //为保证数据的本地化,经过getPreferredLocs()函数,计算task的最佳计算位置    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {    //这里匹配 全部的finalStage都是ResultStage,    // 以前的都是ShuffleMapStage      stage match {        case s: ShuffleMapStage =>          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap        case s: ResultStage =>          partitionsToCompute.map { id =>            val p = s.partitions(id)            (id, getPreferredLocs(stage.rdd, p))          }.toMap      }    } catch {      case NonFatal(e) =>        stage.makeNewStageAttempt(partitionsToCompute.size)        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))        runningStages -= stage        return    }
   stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
   // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast    // the serialized copy of the RDD and for each task we will deserialize it, which means each    // task gets a different copy of the RDD. This provides stronger isolation between tasks that    // might modify state of objects referenced in their closures. This is necessary in Hadoop    // where the JobConf/Configuration object is not thread-safe.    var taskBinary: Broadcast[Array[Byte]] = null    try {      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).      // For ResultTask, serialize and broadcast (rdd, func).      // 这里就是根据不一样类型Task作了序列化和广播化      val taskBinaryBytes: Array[Byte] = stage match {        case stage: ShuffleMapStage =>          JavaUtils.bufferToArray(            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))        case stage: ResultStage =>          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))      }      //经过广播变量进行广播      taskBinary = sc.broadcast(taskBinaryBytes)    } catch {      // In the case of a failure during serialization, abort the stage.      case e: NotSerializableException =>        abortStage(stage, "Task not serializable: " + e.toString, Some(e))        runningStages -= stage
       // Abort execution        return      case NonFatal(e) =>        abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))        runningStages -= stage        return    }    //为stage建立指定数量的task任务    val tasks: Seq[Task[_]] = try {      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()      stage match {       // ShuffleMapStage生成ShuffleMapTask        case stage: ShuffleMapStage =>          stage.pendingPartitions.clear()          partitionsToCompute.map { id =>            val locs = taskIdToLocations(id)            val part = stage.rdd.partitions(id)            stage.pendingPartitions += id            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),              Option(sc.applicationId), sc.applicationAttemptId)          }         // ResultStage生成ResultTask        case stage: ResultStage =>          partitionsToCompute.map { id =>            val p: Int = stage.partitions(id)            val part = stage.rdd.partitions(p)            val locs = taskIdToLocations(id)            new ResultTask(stage.id, stage.latestInfo.attemptId,              taskBinary, part, locs, id, properties, serializedTaskMetrics,              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)          }      }    } catch {      case NonFatal(e) =>        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))        runningStages -= stage        return    }    //若是task数量大于0,这里将task封装成Taskset交给TaskSchduler的submitTasks进行处理    //Stage分两种类型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask    //这里的taskScheduler是SparkContext初始化的时候进行建立的,能够回忆一下    if (tasks.size > 0) {      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")      taskScheduler.submitTasks(new TaskSet(        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())    } else {      // Because we posted SparkListenerStageSubmitted earlier, we should mark      // the stage as completed here in case there are no tasks to run      markStageAsFinished(stage, None)
     val debugString = stage match {        case stage: ShuffleMapStage =>          s"Stage ${stage} is actually done; " +            s"(available: ${stage.isAvailable}," +            s"available outputs: ${stage.numAvailableOutputs}," +            s"partitions: ${stage.numPartitions})"        case stage : ResultStage =>          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"      }      logDebug(debugString)
     submitWaitingChildStages(stage)    }  }

  

2.上面是经过getPreferredLocs()函数计算task的最佳计算位置,这个函数的大致逻辑以下:this

    1).先看partition有没有被cache ,若是被cache,则partition所在节点就是最佳位置spa

    2).若是没有cache 再去看有没有被checkpoint,若是有,则checkpoint所在节点就是最佳位置debug

    3).若是既没有被cache也没有被checkpoint,就递归去找父RDD,查看对应的cache和checkpoint肯定最佳位置code

    4).若是上面三项都没知足,则这个partition没有最佳位置


这里咱们跟进去看一下,代码:

//计算task的最佳计算位置,大致逻辑://1.先看partition有没有被cache ,若是被cache,则partition所在节点就是最佳位置//2.若是没有cache 再去看有没有被checkpoint,若是有,则checkpoint所在节点就是最佳位置//3.若是既没有被cache也没有被checkpoint,就递归去找父RDD,查看对应的cache和checkpoint肯定最佳位置//若是上面三项都没知足,则这个partition没有最佳位置private def getPreferredLocsInternal(      rdd: RDD[_],      partition: Int,      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {    // If the partition has already been visited, no need to re-visit.    // This avoids exponential path exploration.  SPARK-695    if (!visited.add((rdd, partition))) {      // Nil has already been returned for previously visited partitions.      return Nil    }    // If the partition is cached, return the cache locations   //先去看cache     val cached = getCacheLocs(rdd)(partition)    if (cached.nonEmpty) {      return cached    }    //再去看checkpoint    // If the RDD has some placement preferences (as is the case for input RDDs), get those    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList    if (rddPrefs.nonEmpty) {      return rddPrefs.map(TaskLocation(_))    }
   // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency    // that has any placement preferences. Ideally we would choose based on transfer sizes,    // but this will do for now.    //递归找父RDD是否被缓存和checkpoint    rdd.dependencies.foreach {      case n: NarrowDependency[_] =>        for (inPart <- n.getParents(partition)) {          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)          if (locs != Nil) {            return locs          }        }
     case _ =>    }   //若是都没有,则这个task没有最佳位置,返回Nil    Nil  }


    至此,task的建立和提交剖析完毕,这里就两个函数,可是Spark任务调度的核心之一,尤为是stage的划分不太好理解,尽可能把源码多看几遍。