Spark 提交任务流程
看 spark 源码有段时间了,网上也看到别人对 spark 提交任务流程的分析,但总也记不住,干脆自己分析一下流程,免得以后忘记。
触发任务提交
spark rdd 算子分两类:transformation 和 action,其中 transformation 对 rdd 进行转换,action 触发最终的计算。这两类算法的区别,主要在于 action 类算子实现了 SparkContext 类中的 runJob 方法,下面拿两个不同的算法示例区别:
/** transformation 算子 */
/**
* Return the first element in this RDD.
*/
def first(): T = withScope {
take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
}
/** action 算子 */
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
所以当调用了 action 算子,就是触发 spark 运行任务。这时候任务由本地 driver 端提交到 spark 集群上。
SparkContext 的 runJob 和 submitJob
action 算子里执行了 sparkContext 的 runJob,看了 SparkContext 源码,我们发现还有一个 submitJob 方法,能起到相同的效果,但 submitJob 是异步的。
注意到两个方法里都有 dagScheduler, 说明 job 是由 dagScheduler 运行的,由于 runJob 和 submitJob 对流程来讲区别不大,这里不再跟踪 submitJob, 但要注意到 submitJob 里是调用了 dagScheduler 的 submitJob 方法。
/** Run a function on a given set of partitions in an RDD and pass the results to the given */
/** handler function. This is the main entry point for all actions in Spark. */
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
/** 真正运行 job 是由 dagScheduler 来完成的。 */
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
/**
* Submit a job for execution and return a FutureJob holding the result.
*/
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
assertNotStopped()
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob(
rdd,
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
partitions,
callSite,
resultHandler,
localProperties.get)
new SimpleFutureAction(waiter, resultFunc)
}
DAGScheduler 的 runJob 方法
我们来看 DAGScheduler 的 runJob 方法,发现 DAGScheduler 的 runJob 方法,和 SparkContext 的 submitJob 方法一样,都调用了 DAGScheduler 的 submitJob 方法。
/** Run an action job on the given RDD and pass all the results to the resultHandler function as */
/** they arrive. */
/** @param rdd target RDD to run tasks on */
/** @param func a function to run on each partition of the RDD */
/** @param partitions set of partitions to run on; some jobs may not want to compute on all */
/** partitions of the target RDD, e.g. for operations like first() */
/** @param callSite where in the user program this job was called */
/** @param resultHandler callback to pass each result to */
/** @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name */
/** @throws Exception when the job fails */
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
/** 这里调用了 submitJob 方法 */
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
/** Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`, */
/** which causes concurrent SQL executions to fail if a fork-join pool is used. Note that */
/** due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's */
/** safe to pass in null here. For more detail, see SPARK-13747. */
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
/** 与 SparkContext 的 submitJob 不同的是,这里等待任务执行完成 */
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
/** SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. */
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
DAGScheduler 的 submitJob 方法
来看一下 DAGScheduler 的 submitJob 方法,注意到这个方法返回值是一个 JobWaiter,这个类的方法 completionFuture 返回一个 Future,这是 submitJob 是异步的原因。
另外,我们发现这个方法的参数,基本都传递给了类 JobSubmitted,由 dagScheduler 的属性 eventProcessLoop 触发任务提交事件,,但到底触发了什么事件呢? 查看 dagScheduler 的属性 eventProcessLoop, 发现这个属性是类 DAGSchedulerEventProcessLoop 的实例。那下面从类 DAGSchedulerEventProcessLoop 来找后面的流程。 这个类和 DAGScheduler 在同一个类文件中,ctrl + f 即可
/** Submit an action job to the scheduler. */
/** @param rdd target RDD to run tasks on */
/** @param func a function to run on each partition of the RDD */
/** @param partitions set of partitions to run on; some jobs may not want to compute on all */
/** partitions of the target RDD, e.g. for operations like first() */
/** @param callSite where in the user program this job was called */
/** @param resultHandler callback to pass each result to */
/** @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name */
/** @return a JobWaiter object that can be used to block until the job finishes executing */
/** or can be used to cancel the job. */
/** @throws IllegalArgumentException when partitions ids are illegal */
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
/** Check to make sure we are not launching a task on a partition that does not exist. */
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
/** Return immediately if the job is running 0 tasks */
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
/** JobWaiter 的方法 completionFuture 返回值是 Future,这解释了为什么 submitJob 是异步的。 */
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
/** rdd 等参数用于生成 JobSubmitted 类的实例,并由 eventProcessLoop 的 post 方法触发事件 */
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
DAGSchedulerEventProcessLoop 处理 JobSubmitted
在类 DAGSchedulerEventProcessLoop 的 doOnReceive 方法中,若事件 event 是 JobSubmitted 类型,由会触发 dagScheduler 的方法 handleJobSubmitted,如下:
/** 这里不详细介绍为什么 post 方法会调用到这里(由于 spark 的 EventLoop 里维护了一个阻塞队列,而类 DAGSchedulerEventProcessLoop 实现了 EventLoop) */
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
/** 这里匹配到了 JobSubmitted,会调用 dagScheduler 的方法 handleJobSubmitted */
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) =>
val filesLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, filesLost)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
DAGScheduler 的方法 handleJobSubmitted
在 DAGScheduler 的方法 handleJobSubmitted 中,涉及到类 ResultStage、ActiveJob,事件 SparkListenerJobStart 和方法 submitStage。 这个方法是处理任务提交的基本方法,也只有到这个方法,才开始真正处理任务。
注意:
- 关于 ResultStage、ActiveJob 的说明,可以查看文章 Spark核心作业调度和任务调度之DAGScheduler源码。 */
- 关于事件 SparkListenerJobStart,这里会触发 onJobStart 事件,具体细节本文不再分析,这里主要涉及 RDD 的 DAG 图等内容。
- 关于方法 submitStage,这个方法中根据 Stage 划分提交 stage,见下一小节。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
/** New stage creation may throw an exception if, for example, jobs are run on a */
/** HadoopRDD whose underlying HDFS files have been deleted. */
/** TODO: 生成 ResultStage,并在这个过程中完成 stage 划分 */
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
/** 生成 ActiveJob 对象,保存 Job 信息 */
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
/** 这里会触发 onJobStart 方法,详见 SparkListenerBus */
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
/** 这里提交 stage,注意这里已经完成了 stage 的划分 */
submitStage(finalStage)
}
DAGScheduler 的方法 submitStage
这个方法提交 stage,是一个递归的实现
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
/** 获取 ActiveJob 的 id,逻辑上讲,这个 id 在 handleJobSubmitted 方法中,就被保存到了 jobIdToActiveJob 中,所以出现找不到 jobId 的情况下,应该提示。 */
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
/** 类 dagScheduler 对 stage 分为三类:waitingStages runningStages 和 failedStages;当前 stage 不在这三类中,即未曾出现过时,才需要去计算 stage */
/** 这三个 stages 都是 Hashset 类型,其 () 方法相当于 contains,用来判断 stage 是否保存在 Set 中 */
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
/** 尝试获取 stage 的 parent stage, */
/** 如果 missing isEmpty 为真,则表示当前 stage 是最上层的 stage,已经没有 parent stage 了,所以需要 submitMissingTasks; */
/** 否则 missing 表示 parent stage 集合,递归 submitStage,直到找到最上层的 stage */
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
/** 拿到最上层 stage(没有 parent stage 了)的 jobId 和 stage,可以提交 task 了 */
submitMissingTasks(stage, jobId.get)
} else {
/** 仍然有 parent stage, missing 表示 parent stage 的集合,递归继续向上查找. */
for (parent <- missing) {
submitStage(parent)
}
/** 由于是深度优先的,所以一定会记录 parent stage, 并在 submitMissingTasks 后把已经执行完成的 stage 删除, */
/** 这个删除动作应该在 submitMissingTasks 中 */
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
/** 这里的 getMissingParentStages,其实是为了获取或创建除 ResultStage 外的其它 ShuffleMapStage */
/** 理论上在这个步骤之前,已经完成了 stage 的划分,只要获取 stagte 就可以了 **/
/** 本文重点分析流程, 因此跳过 getOrCreateShuffleMapStage 方法的分析,以后再分析吧. */
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
/** We are manually maintaining a stack here to prevent StackOverflowError */
/** caused by recursively visiting */
val waitingForVisit = new Stack[RDD[_]]
/** 定义 visit 函数,通过 visit 函数和 waitingForVisit 堆栈,借且 rdd 的类型,完成 stage 的划分 */
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
/** 其实是根据 rdd 的类型,来确实是否需要创建 ShuffleMapStage */
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
/** 若创建了 ShuffleMapStage,则添加到 missing 中,这样递归完,就完成了 stage 的划分. */
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
/** 广度优先,遍历完成后,找到所有 ShuffleDependency,进而就可以创建所有的 ShuffleMapStage */
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
}
/** Called when stage's parents are available and we can now do its task. */
/** 这个方法是关于 stage 到 task 的过程,包括 ShuffleMapTask 和 ResultTask */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
/** Get our pending tasks and remember them in our pendingTasks entry */
stage.pendingPartitions.clear()
/** First figure out the indexes of partition ids to compute. */
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
/** Use the scheduling pool, job group, description, etc. from an ActiveJob associated */
/** with this Stage */
val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
/** SparkListenerStageSubmitted should be posted before testing whether tasks are */
/** serializable. If tasks are not serializable, a SparkListenerStageCompleted event */
/** will be posted, which should always come after a corresponding SparkListenerStageSubmitted */
/** event. */
/** TODO outputCommitCoordinator 的作用暂时不太清楚 */
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
/** 找到 task 的节点 */
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
/** TODO listenerBus 里的 SparkListenerStageSubmitted 事件要做什么? */
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
/** TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. */
/** Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast */
/** the serialized copy of the RDD and for each task we will deserialize it, which means each */
/** task gets a different copy of the RDD. This provides stronger isolation between tasks that */
/** might modify state of objects referenced in their closures. This is necessary in Hadoop */
/** where the JobConf/Configuration object is not thread-safe. */
/** 这里根据 stage 生成序列化值 taskBinary,用于后面在各个节点上生成 task */
var taskBinary: Broadcast[Array[Byte]] = null
try {
/** For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). */
/** For ResultTask, serialize and broadcast (rdd, func). */
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
/** In the case of a failure during serialization, abort the stage. */
case e: NotSerializableExce
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
/** Because we posted SparkListenerStageSubmitted earlier, we should mark */
/** the stage as completed here in case there are no tasks to run */
markStageAsFinished(stage, None)
val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
submitWaitingChildStages(stage)
}
}
至此,spark 提交 Job 的流程大致梳理完了,由于流程比较复杂,所以这里跳过了很多细节,并且留下好几个 TODO ,后续再在其它文章中补充吧