Spark 提交任务流程

Posted by mzl on March 2, 2018

Spark 提交任务流程

看 spark 源码有段时间了,网上也看到别人对 spark 提交任务流程的分析,但总也记不住,干脆自己分析一下流程,免得以后忘记。

触发任务提交

spark rdd 算子分两类:transformation 和 action,其中 transformation 对 rdd 进行转换,action 触发最终的计算。这两类算法的区别,主要在于 action 类算子实现了 SparkContext 类中的 runJob 方法,下面拿两个不同的算法示例区别:

/** transformation 算子 */

/**
 * Return the first element in this RDD.
 */
def first(): T = withScope {
  take(1) match {
    case Array(t) => t
    case _ => throw new UnsupportedOperationException("empty collection")
  }
}

/** action 算子 */
/**
 * Return the number of elements in the RDD.
 */
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

所以当调用了 action 算子,就是触发 spark 运行任务。这时候任务由本地 driver 端提交到 spark 集群上。

SparkContext 的 runJob 和 submitJob

action 算子里执行了 sparkContext 的 runJob,看了 SparkContext 源码,我们发现还有一个 submitJob 方法,能起到相同的效果,但 submitJob 是异步的。

注意到两个方法里都有 dagScheduler, 说明 job 是由 dagScheduler 运行的,由于 runJob 和 submitJob 对流程来讲区别不大,这里不再跟踪 submitJob, 但要注意到 submitJob 里是调用了 dagScheduler 的 submitJob 方法。

/** Run a function on a given set of partitions in an RDD and pass the results to the given */
/** handler function. This is the main entry point for all actions in Spark. */
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  /** 真正运行 job 是由 dagScheduler 来完成的。 */
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

/**
 * Submit a job for execution and return a FutureJob holding the result.
 */
def submitJob[T, U, R](
    rdd: RDD[T],
    processPartition: Iterator[T] => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit,
    resultFunc: => R): SimpleFutureAction[R] =
{
  assertNotStopped()
  val cleanF = clean(processPartition)
  val callSite = getCallSite
  val waiter = dagScheduler.submitJob(
    rdd,
    (context: TaskContext, iter: Iterator[T]) => cleanF(iter),
    partitions,
    callSite,
      resultHandler,
      localProperties.get)
    new SimpleFutureAction(waiter, resultFunc)
}

DAGScheduler 的 runJob 方法

我们来看 DAGScheduler 的 runJob 方法,发现 DAGScheduler 的 runJob 方法,和 SparkContext 的 submitJob 方法一样,都调用了 DAGScheduler 的 submitJob 方法。

/** Run an action job on the given RDD and pass all the results to the resultHandler function as */
/** they arrive. */

/** @param rdd target RDD to run tasks on */
/** @param func a function to run on each partition of the RDD */
/** @param partitions set of partitions to run on; some jobs may not want to compute on all */
/**   partitions of the target RDD, e.g. for operations like first() */
/** @param callSite where in the user program this job was called */
/** @param resultHandler callback to pass each result to */
/** @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name */
 
/** @throws Exception when the job fails */
def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime
  /** 这里调用了 submitJob 方法 */
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  /** Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`, */
  /** which causes concurrent SQL executions to fail if a fork-join pool is used. Note that */
  /** due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's */
  /** safe to pass in null here. For more detail, see SPARK-13747. */
  val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
  waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
  /** 与 SparkContext 的 submitJob 不同的是,这里等待任务执行完成 */
  waiter.completionFuture.value.get match {
    case scala.util.Success(_) =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case scala.util.Failure(exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      /** SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. */
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}

DAGScheduler 的 submitJob 方法

来看一下 DAGScheduler 的 submitJob 方法,注意到这个方法返回值是一个 JobWaiter,这个类的方法 completionFuture 返回一个 Future,这是 submitJob 是异步的原因。

另外,我们发现这个方法的参数,基本都传递给了类 JobSubmitted,由 dagScheduler 的属性 eventProcessLoop 触发任务提交事件,,但到底触发了什么事件呢? 查看 dagScheduler 的属性 eventProcessLoop, 发现这个属性是类 DAGSchedulerEventProcessLoop 的实例。那下面从类 DAGSchedulerEventProcessLoop 来找后面的流程。 这个类和 DAGScheduler 在同一个类文件中,ctrl + f 即可

/** Submit an action job to the scheduler. */
 
/** @param rdd target RDD to run tasks on */
/** @param func a function to run on each partition of the RDD */
/** @param partitions set of partitions to run on; some jobs may not want to compute on all */
/**   partitions of the target RDD, e.g. for operations like first() */
/** @param callSite where in the user program this job was called */
/** @param resultHandler callback to pass each result to */
/** @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name */

/** @return a JobWaiter object that can be used to block until the job finishes executing */
/**         or can be used to cancel the job. */
 
/** @throws IllegalArgumentException when partitions ids are illegal */
def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  /** Check to make sure we are not launching a task on a partition that does not exist. */
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }

  val jobId = nextJobId.getAndIncrement()
  if (partitions.size == 0) {
    /** Return immediately if the job is running 0 tasks */
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }

  assert(partitions.size > 0)
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  /** JobWaiter 的方法 completionFuture 返回值是 Future,这解释了为什么 submitJob 是异步的。 */
  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  /** rdd 等参数用于生成 JobSubmitted 类的实例,并由 eventProcessLoop 的 post 方法触发事件 */
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
  waiter
}

DAGSchedulerEventProcessLoop 处理 JobSubmitted

在类 DAGSchedulerEventProcessLoop 的 doOnReceive 方法中,若事件 event 是 JobSubmitted 类型,由会触发 dagScheduler 的方法 handleJobSubmitted,如下:

/** 这里不详细介绍为什么 post 方法会调用到这里(由于 spark 的 EventLoop 里维护了一个阻塞队列,而类 DAGSchedulerEventProcessLoop 实现了 EventLoop) */
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
  /** 这里匹配到了 JobSubmitted,会调用 dagScheduler 的方法 handleJobSubmitted */
  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

  case StageCancelled(stageId) =>
    dagScheduler.handleStageCancellation(stageId)

  case JobCancelled(jobId) =>
    dagScheduler.handleJobCancellation(jobId)

  case JobGroupCancelled(groupId) =>
    dagScheduler.handleJobGroupCancelled(groupId)

  case AllJobsCancelled =>
    dagScheduler.doCancelAllJobs()

  case ExecutorAdded(execId, host) =>
    dagScheduler.handleExecutorAdded(execId, host)

  case ExecutorLost(execId, reason) =>
    val filesLost = reason match {
      case SlaveLost(_, true) => true
      case _ => false
    }
    dagScheduler.handleExecutorLost(execId, filesLost)

  case BeginEvent(task, taskInfo) =>
    dagScheduler.handleBeginEvent(task, taskInfo)

  case GettingResultEvent(taskInfo) =>
    dagScheduler.handleGetTaskResult(taskInfo)

  case completion: CompletionEvent =>
    dagScheduler.handleTaskCompletion(completion)

  case TaskSetFailed(taskSet, reason, exception) =>
    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

  case ResubmitFailedStages =>
    dagScheduler.resubmitFailedStages()
}

DAGScheduler 的方法 handleJobSubmitted

在 DAGScheduler 的方法 handleJobSubmitted 中,涉及到类 ResultStage、ActiveJob,事件 SparkListenerJobStart 和方法 submitStage。 这个方法是处理任务提交的基本方法,也只有到这个方法,才开始真正处理任务。

注意:

  1. 关于 ResultStage、ActiveJob 的说明,可以查看文章 Spark核心作业调度和任务调度之DAGScheduler源码。 */
  2. 关于事件 SparkListenerJobStart,这里会触发 onJobStart 事件,具体细节本文不再分析,这里主要涉及 RDD 的 DAG 图等内容。
  3. 关于方法 submitStage,这个方法中根据 Stage 划分提交 stage,见下一小节。
private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try {
    /** New stage creation may throw an exception if, for example, jobs are run on a */
    /** HadoopRDD whose underlying HDFS files have been deleted. */
    /** TODO: 生成 ResultStage,并在这个过程中完成 stage 划分 */
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }
  
  /** 生成 ActiveJob 对象,保存 Job 信息 */
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
  logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  logInfo("Parents of final stage: " + finalStage.parents)
  logInfo("Missing parents: " + getMissingParentStages(finalStage))

  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job
  activeJobs += job
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  /** 这里会触发 onJobStart 方法,详见 SparkListenerBus */
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  /** 这里提交 stage,注意这里已经完成了 stage 的划分 */
  submitStage(finalStage)
}

DAGScheduler 的方法 submitStage

这个方法提交 stage,是一个递归的实现

/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
  /** 获取 ActiveJob 的 id,逻辑上讲,这个 id 在 handleJobSubmitted 方法中,就被保存到了 jobIdToActiveJob 中,所以出现找不到 jobId 的情况下,应该提示。 */
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    /** 类 dagScheduler 对 stage 分为三类:waitingStages runningStages 和 failedStages;当前 stage 不在这三类中,即未曾出现过时,才需要去计算 stage */
    /** 这三个 stages 都是 Hashset 类型,其 () 方法相当于 contains,用来判断 stage 是否保存在 Set 中 */
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      /** 尝试获取 stage 的 parent stage, */
      /** 如果 missing isEmpty 为真,则表示当前 stage 是最上层的 stage,已经没有 parent stage 了,所以需要 submitMissingTasks; */
      /** 否则 missing 表示 parent stage 集合,递归 submitStage,直到找到最上层的 stage */
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        /** 拿到最上层 stage(没有 parent stage 了)的 jobId 和 stage,可以提交 task 了 */
        submitMissingTasks(stage, jobId.get)
      } else {
        /** 仍然有 parent stage, missing 表示 parent stage 的集合,递归继续向上查找. */
        for (parent <- missing) {
          submitStage(parent)
        }
        /** 由于是深度优先的,所以一定会记录 parent stage, 并在 submitMissingTasks 后把已经执行完成的 stage 删除, */
        /** 这个删除动作应该在 submitMissingTasks 中 */
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}

/** 这里的 getMissingParentStages,其实是为了获取或创建除 ResultStage 外的其它 ShuffleMapStage */
/** 理论上在这个步骤之前,已经完成了 stage 的划分,只要获取 stagte 就可以了 **/
/** 本文重点分析流程, 因此跳过 getOrCreateShuffleMapStage 方法的分析,以后再分析吧. */
private def getMissingParentStages(stage: Stage): List[Stage] = {
  val missing = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]
  /** We are manually maintaining a stack here to prevent StackOverflowError */
  /** caused by recursively visiting */
  val waitingForVisit = new Stack[RDD[_]]
  /** 定义 visit 函数,通过 visit 函数和 waitingForVisit 堆栈,借且 rdd 的类型,完成 stage 的划分 */
  def visit(rdd: RDD[_]) {
    if (!visited(rdd)) {
      visited += rdd
      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
      if (rddHasUncachedPartitions) {
        for (dep <- rdd.dependencies) {
          /** 其实是根据 rdd 的类型,来确实是否需要创建 ShuffleMapStage */
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
              /** 若创建了 ShuffleMapStage,则添加到 missing 中,这样递归完,就完成了 stage 的划分. */
              if (!mapStage.isAvailable) {
                missing += mapStage
              }
            case narrowDep: NarrowDependency[_] =>
              waitingForVisit.push(narrowDep.rdd)
          }
        }
      }
    }
  }
  /** 广度优先,遍历完成后,找到所有 ShuffleDependency,进而就可以创建所有的 ShuffleMapStage */
  waitingForVisit.push(stage.rdd)
  while (waitingForVisit.nonEmpty) {
    visit(waitingForVisit.pop())
  }
  missing.toList
}

/** Called when stage's parents are available and we can now do its task. */
/** 这个方法是关于 stage 到 task 的过程,包括 ShuffleMapTask 和 ResultTask */
private def submitMissingTasks(stage: Stage, jobId: Int) {
  logDebug("submitMissingTasks(" + stage + ")")
  /** Get our pending tasks and remember them in our pendingTasks entry */
  stage.pendingPartitions.clear()

  /** First figure out the indexes of partition ids to compute. */
  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

  /** Use the scheduling pool, job group, description, etc. from an ActiveJob associated */
  /** with this Stage */
  val properties = jobIdToActiveJob(jobId).properties

  runningStages += stage
  /** SparkListenerStageSubmitted should be posted before testing whether tasks are */
  /** serializable. If tasks are not serializable, a SparkListenerStageCompleted event */
  /** will be posted, which should always come after a corresponding SparkListenerStageSubmitted */
  /** event. */
  /** TODO outputCommitCoordinator 的作用暂时不太清楚 */
  stage match {
    case s: ShuffleMapStage =>
      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
    case s: ResultStage =>
      outputCommitCoordinator.stageStart(
        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
  }
  /** 找到 task 的节点 */
  val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
    stage match {
      case s: ShuffleMapStage =>
        partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
      case s: ResultStage =>
        partitionsToCompute.map { id =>
          val p = s.partitions(id)
          (id, getPreferredLocs(stage.rdd, p))
        }.toMap
    }
  } catch {
    case NonFatal(e) =>
      stage.makeNewStageAttempt(partitionsToCompute.size)
      listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
      abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
      runningStages -= stage
      return
  }

  /** TODO listenerBus 里的 SparkListenerStageSubmitted 事件要做什么? */
  stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

  /** TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. */
  /** Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast */
  /** the serialized copy of the RDD and for each task we will deserialize it, which means each */
  /** task gets a different copy of the RDD. This provides stronger isolation between tasks that */
  /** might modify state of objects referenced in their closures. This is necessary in Hadoop */
  /** where the JobConf/Configuration object is not thread-safe. */
  /** 这里根据 stage 生成序列化值 taskBinary,用于后面在各个节点上生成 task */
  var taskBinary: Broadcast[Array[Byte]] = null
  try {
    /** For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). */
    /** For ResultTask, serialize and broadcast (rdd, func). */
    val taskBinaryBytes: Array[Byte] = stage match {
      case stage: ShuffleMapStage =>
        JavaUtils.bufferToArray(
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
      case stage: ResultStage =>
        JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
    }

    taskBinary = sc.broadcast(taskBinaryBytes)
  } catch {
    /** In the case of a failure during serialization, abort the stage. */
    case e: NotSerializableExce
      abortStage(stage, "Task not serializable: " + e.toString, Some(e))
      runningStages -= stage

      // Abort execution
      return
    case NonFatal(e) =>
      abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
      runningStages -= stage
      return
  }

  val tasks: Seq[Task[_]] = try {
    stage match {
      case stage: ShuffleMapStage =>
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
            Option(sc.applicationId), sc.applicationAttemptId)
        }

      case stage: ResultStage =>
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = taskIdToLocations(id)
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
            Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
        }
    }
  } catch {
    case NonFatal(e) =>
      abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
      runningStages -= stage
      return
  }

  if (tasks.size > 0) {
    logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
    stage.pendingPartitions ++= tasks.map(_.partitionId)
    logDebug("New pending partitions: " + stage.pendingPartitions)
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  } else {
    /** Because we posted SparkListenerStageSubmitted earlier, we should mark */
    /** the stage as completed here in case there are no tasks to run */
    markStageAsFinished(stage, None)

    val debugString = stage match {
      case stage: ShuffleMapStage =>
        s"Stage ${stage} is actually done; " +
          s"(available: ${stage.isAvailable}," +
          s"available outputs: ${stage.numAvailableOutputs}," +
          s"partitions: ${stage.numPartitions})"
      case stage : ResultStage =>
        s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
    }
    logDebug(debugString)

    submitWaitingChildStages(stage)
  }
}

至此,spark 提交 Job 的流程大致梳理完了,由于流程比较复杂,所以这里跳过了很多细节,并且留下好几个 TODO ,后续再在其它文章中补充吧