Spark driver 启动流程分析
我们知道,提交 Spark app 的时候会需要先创建初始化 sc,然后 spark 会启动一个 driver 端,这个 driver 端用来 执行我们日常开发的 app 的 main 方法,并创建 sc(听起来就像是本地代码部分,事实上有细微区别,因为 driver 端 可以不在本地,而在集群上,如 yarn 的 cluster 模式下, driver 端是 yarn 上的一个 app).
所以我们知道,启动 driver 是在初始化 sc 的时候完成的,这里是我们分析的起点。
我们以 standalone 模式为例画流程图
初始化 DriverEndpoint
从以前的博客-Spark 任务分发与执行流程 中提到,TaskSchedulerImpl 和相应的 SchedulerBackend 是在 sc 中初始化完成的。之后调用 taskScheduler 的 start 方法。 并在这个方法里调用了 schedulerBackend 的 start 方法。在集群模式下,无论是 standalone 模式下的 StandaloneSchedulerBackend 还是 yarn 下的 YarnSchedulerBackend, 都继承自类 CoarseGrainedSchedulerBackend,并在 start 方法中初始化 driverEndpoint, 如下:
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
/** TODO (prashant) send conf instead of properties */
driverEndpoint = createDriverEndpointRef(properties)
}
protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
/** 初始化 DriverEndpoint 类 */
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
}
为了便于举例,我们以 standalone 的集群模式为例说明。此时 schedulerBackend 是 StandaloneSchedulerBackend 类的实例对象。
初始化 StandaloneAppClient
在 schedulerBackend 的 start 方法中,会初始化 StandaloneAppClient, 类似的,在 yarn 的 client 模式下,会初始化 Client 类。 即意味着一定要初始化一个 Client 类,但这两个 Client 类并没有公共的父类或接口(除 Logging 外).接下来看一下类 StandaloneSchedulerBackend 的 start 方法
override def start() {
/** 上一节提到,在这里创建了 driverEndpoint */
super.start()
/** SPARK-21159. The scheduler backend should only try to connect to the launcher when in client */
/** mode. In cluster mode, the code that submits the application to the Master needs to connect */
/** to the launcher instead. */
if (sc.deployMode == "client") {
launcherBackend.connect()
}
/** The endpoint for executors to talk to us */
val driverUrl = RpcEndpointAddress(
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "",
"--hostname", "",
"--cores", "",
"--app-id", "",
"--worker-url", "")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
/** When testing, expose the parent class path to the child. This is processed by */
/** compute-classpath.{cmd,sh} and makes all needed jars available to child processes */
/** when the assembly is built with the "*-provided" profiles enabled. */
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
/** Start executors with a few necessary configs for registering with the scheduler */
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
/** 从类名来看,要创建 CoarseGrainedExecutorBackend 了 */
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
/** If we're using dynamic allocation, set our initial executor limit to 0 for now. */
/** ExecutorAllocationManager will send the real initial limit to the Master later. */
val initialExecutorLimit =
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
}
/** 初始化 ApplicationDescription, 为后面创建 application 做准备 */
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
/** 初始化 StandaloneAppClient */
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
/** 启动 StandaloneAppClient,在启动方法里,会初始化 ClientEndpoint */
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
/** 等待注册完成, 这里的注册可能包括 driver 信息的注册,app 的注册和 executor 的注册 */
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
在前面的博客中,我们简单介绍了如何触发 onStart 方法,这里也是类似的,我们不再赘述了,我们清楚,后面 我们关注 ClientEndpoint 注册到 master 节点.
ClientEndpoint 注册到 Master 节点
在 ClientEndpoint 的 onStart 方法被调用后,会调用方法 registerWithMaster 向 Master 节点注册 app,如下:
/** 触发 onStart 方法后,向 master 注册 application 信息 */
override def onStart(): Unit = {
try {
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
/** Register with all masters asynchronously. It will call `registerWithMaster` every */
/** REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times. */
/** Once we connect to a master successfully, all scheduling work and Futures will be cancelled. */
/** */
/** nthRetry means this is the nth attempt to register with master. */
private def registerWithMaster(nthRetry: Int) {
/** 首先尝试向所有节点注册 application */
registerMasterFutures.set(tryRegisterAllMasters())
/** 这里是注册失败后的重试机制,多次尝试向 master 节点注册 app, 这里我们忽略掉 */
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
/** Register with all masters asynchronously and returns an array `Future`s for cancellation. */
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
/** 启动 master url 里所有的 master 节点, 如果当前 master 节点为 standby,则跳过不处理 application 的注册请求 */
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
/** 向 master 节点提交请求,触发其 RegisterApplication 事件 */
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
Master 的 RegisterApplication 事件
ClientEndpoint 向 Master 发送 RegisterApplication 事件请求,准备注册 Application
case RegisterApplication(description, driver) =>
/** TODO Prevent repeated registrations from some driver */
/** 若当前 Master 节点为 standby,则忽略注册 app 的事件 */
if (state == RecoveryState.STANDBY) {
/** ignore, don't send response */
} else {
logInfo("Registering app " + description.name)
/** 创建 app */
val app = createApplication(description, driver)
/** 注册 app */
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
/** 将 app 信息持久化 */
persistenceEngine.addApplication(app)
/** 向 driver 端发送 RegisteredApplication 事件 */
driver.send(RegisteredApplication(app.id, self))
/** 启动 executor */
schedule()
}
在这里,我们顺便关注一下 ClientEndpoint 的 RegisteredApplication 事件 这个事件只是 client 端对 app 已经注册后的状态的更新,并没有很重要的方式
case RegisteredApplication(appId_, masterRef) =>
/** FIXME How to handle the following cases? */
/** 1. A master receives multiple registrations and sends back multiple */
/** RegisteredApplications due to an unstable network. */
/** 2. Receive multiple RegisteredApplication from different masters because the master is */
/** changing. */
appId.set(appId_)
registered.set(true)
master = Some(masterRef)
listener.connected(appId.get)
Master 的 schedule 方法
这个方法里做了几个重要工作,如 launchDriver 和 startExecutorsOnWorkers, 其中:
- launchDriver 方法是在所有有效的 Worker 节点启动一个相应的 driver 线程,处理该 driver 有关的信息。
- startExecutorsOnWorkers 方法在 worker 节点启动 executor.
/** Schedule the currently available resources among waiting apps. This method will be called */
/** every time a new app joins or resource availability changes. */
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
/** Drivers take strict precedence over executors */
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
/** We assign workers to each waiting driver in a round-robin fashion. For each driver, we */
/** start from the last worker that was assigned a driver, and continue onwards until we have */
/** explored all alive workers. */
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
/** 如果 worker 节点的资源满足 driver 线程需要的资源,则在 Worker 节点启动一个 driver 线程 */
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
/** 在 worker 节点启动 executor, 为接下来的计算做准备 */
startExecutorsOnWorkers()
}
Master 的 launchDriver 方法
launchDriver 方法主要是触发了 worker 节点的 LaunchDriver 事件 这里是 Master 节点的 launchDriver 方法和 Worker 节点的 LaunchDriver 事件
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
/** 触发 worker 节点的 LaunchDriver 事件 */
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
/** Worker 节点的 LaunchDriver 事件 */
/** 可以知道该事件是在 Worker 节点启动了一个线程, 该线程保留了 driver 节点的信息 */
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
Master 的 startExecutorsOnWorkers 方法
这个方法主要是在 worker 节点上启动 executor.
/** Schedule and launch executors on workers */
private def startExecutorsOnWorkers(): Unit = {
/** Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app */
/** in the queue, then the second app, etc. */
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
/** Filter out workers that don't have enough resources to launch an executor */
/** 找出资源够用的 Worker 节点,即内存和核数大于 executor 所需要的内存与核数 */
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
/** 初步计划在每个 worker 上为 executor 分配多少个核,这里只是计算能否分配,并计划分配多少,并没有直接去分配 */
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
/** Now that we've decided how many cores to allocate on each worker, let's allocate them */
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
/** 决定每个 worker 分配多少个核,并进行分配 */
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
Master 的 allocateWorkerResourceToExecutors 方法
该方法真正执行在 worker 上为 executor 分配多少核,并启动 executor
/** Allocate a worker's resources to one or more executors. */
/** @param app the info of the application which the executors belong to */
/** @param assignedCores number of cores on this worker for this application */
/** @param coresPerExecutor number of cores per executor */
/** @param worker the worker info */
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
/** If the number of cores per executor is specified, we divide the cores assigned */
/** to this worker evenly among the executors with no remainder. */
/** Otherwise, we launch a single executor that grabs all the assignedCores on this worker. */
/** 如果 number of cores per executor 被指明了,我们平均地把这个 worker 节点上的核数分配给 */
/** 每个 executor; 否则,我们只启动一个 executor,并将已计划的所有的核数分配给它 */
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
Master 的 launchExecutor 方法
这个方法会触发 worker 的 LaunchExecutor 事件,以及 client 端的 ExecutorAdded 事件
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
/** worker 节点触发 LaunchExecutor 事件 */
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
/** client 端触发 ExecutorAdded 事件, 这个事件只是打印一句日志,跳过 */
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
Worker 的 LaunchExecutor 事件
这个事件会尝试在 worker 节点启动一个 ExecutorRunner 线程,并由这个线程去启动 executor
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
/** Create the executor's working directory */
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
/** Create local dirs for the executor. These are passed to the executor via the */
/** SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the */
/** application finishes. */
val appLocalDirs = appDirectories.getOrElse(appId,
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
}.toSeq)
appDirectories(appId) = appLocalDirs
/** 通过这个线程类去启动 executor */
/** 这个线程类中,启动 executor 的描述命令,来自 appDesc,在这个线程类中并没有介绍 */
/** 因此,想了解这个信息,需要追溯 appDesc 的创建部分 */
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
/** 启动线程 */
manager.start()
coresUsed += cores_
memoryUsed += memory_
/** 将启动 executor 的结果发送给 master 节点 */
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
/** 启动 executor 异常时,发送 FAILED 状态给 mdaster */
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}