接上篇文章:Spark2.x精通:Job触发流程源码深度剖析(一),咱们这里继续讲解如何进行task建立、task最佳位置计算和task提交,接上篇文章的submitMissingTasks()函数作详细讲解。
缓存
直接看submitMissingTasks()函数如何进行task的建立,这个函数主要干了如下几件事:app
1).task最佳位置的计算
ide
2).序列化task数据,根据不一样类型进行广播,Executor计算时会进行反序列化
函数
3).根据不一样stage类型分别建立ShuffleMapTask和ResultTask,封装成对应的TaskSet,提交给TaskScheduler进行处理
oop
代码以下:post
/** Called when stage's parents are available and we can now do its task. */ //这里建立一批task,这里task数量和partition数据相同 private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute. //调用findMissingPartitions须要计算的partition val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties = jobIdToActiveJob(jobId).properties //将stage加入running队列 runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage match { case s: ShuffleMapStage => outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1) case s: ResultStage => outputCommitCoordinator.stageStart( stage = s.id, maxPartitionId = s.rdd.partitions.length - 1) } //这里根据parition的id进行task的建立,一个partition一个task //为保证数据的本地化,经过getPreferredLocs()函数,计算task的最佳计算位置 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try { //这里匹配 全部的finalStage都是ResultStage, // 以前的都是ShuffleMapStage stage match { case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap case s: ResultStage => partitionsToCompute.map { id => val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } } catch { case NonFatal(e) => stage.makeNewStageAttempt(partitionsToCompute.size) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). // 这里就是根据不一样类型Task作了序列化和广播化 val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } //经过广播变量进行广播 taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage
// Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } //为stage建立指定数量的task任务 val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { // ShuffleMapStage生成ShuffleMapTask case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } // ResultStage生成ResultTask case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return } //若是task数量大于0,这里将task封装成Taskset交给TaskSchduler的submitTasks进行处理 //Stage分两种类型ShuffleMapStage生成ShuffleMapTask,ResultStage生成ResultTask //这里的taskScheduler是SparkContext初始化的时候进行建立的,能够回忆一下 if (tasks.size > 0) { logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None)
val debugString = stage match { case stage: ShuffleMapStage => s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})" case stage : ResultStage => s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString)
submitWaitingChildStages(stage) } }
2.上面是经过getPreferredLocs()函数计算task的最佳计算位置,这个函数的大致逻辑以下:this
1).先看partition有没有被cache ,若是被cache,则partition所在节点就是最佳位置spa
2).若是没有cache 再去看有没有被checkpoint,若是有,则checkpoint所在节点就是最佳位置debug
3).若是既没有被cache也没有被checkpoint,就递归去找父RDD,查看对应的cache和checkpoint肯定最佳位置code
4).若是上面三项都没知足,则这个partition没有最佳位置
这里咱们跟进去看一下,代码:
//计算task的最佳计算位置,大致逻辑://1.先看partition有没有被cache ,若是被cache,则partition所在节点就是最佳位置//2.若是没有cache 再去看有没有被checkpoint,若是有,则checkpoint所在节点就是最佳位置//3.若是既没有被cache也没有被checkpoint,就递归去找父RDD,查看对应的cache和checkpoint肯定最佳位置//若是上面三项都没知足,则这个partition没有最佳位置private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { // If the partition has already been visited, no need to re-visit. // This avoids exponential path exploration. SPARK-695 if (!visited.add((rdd, partition))) { // Nil has already been returned for previously visited partitions. return Nil } // If the partition is cached, return the cache locations //先去看cache val cached = getCacheLocs(rdd)(partition) if (cached.nonEmpty) { return cached } //再去看checkpoint // If the RDD has some placement preferences (as is the case for input RDDs), get those val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList if (rddPrefs.nonEmpty) { return rddPrefs.map(TaskLocation(_)) }
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency // that has any placement preferences. Ideally we would choose based on transfer sizes, // but this will do for now. //递归找父RDD是否被缓存和checkpoint rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } }
case _ => } //若是都没有,则这个task没有最佳位置,返回Nil Nil }
至此,task的建立和提交剖析完毕,这里就两个函数,可是Spark任务调度的核心之一,尤为是stage的划分不太好理解,尽可能把源码多看几遍。