execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
val msg = StatusUpdate(executorId, taskId, state, data)
driver match {
case Some(driverRef) => driverRef.send(msg)
case None => logWarning(s"Drop $msg because has not yet connected to driver")
}
}
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
// task丢失,则标记对应的executor也丢失,并涉及到一些映射跟新
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (executorIdToTaskCount.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
//获取task所在的taskSetManager
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid).foreach { execId =>
if (executorIdToTaskCount.contains(execId)) {
executorIdToTaskCount(execId) -= 1
}
}
}
// task成功的处理
if (state == TaskState.FINISHED) {
// 将当前task从taskSet中正在执行的task列表中移除
taskSet.removeRunningTask(tid)
//成功执行时,在线程池中处理任务的结果
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
//处理失败的情况
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskSet.removeRunningTask(tid)
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates)")
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
...
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
// 从该stage中等待处理的partition列表中移除Task对应的partition
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
}
// 若是ShuffleMapTask
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
// 忽略在集群中游走的ShuffleMapTask(来自一个失效的节点的Task结果)。
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
// 将结果保存到对应的Stage
shuffleStage.addOutputLoc(smt.partitionId, status)
}
// 若当前stage的所有task已经全部执行完毕
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
// 将stage的结果注册到MapOutputTrackerMaster
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocInMapOutputTrackerFormat(),
changeEpoch = true)
// 清除本地缓存
clearCacheLocs()
// 若stage一些task执行失败没有结果,重新提交stage来调度执行未执行的task
if (!shuffleStage.isAvailable) {
// Some tasks had failed; let's resubmit this shuffleStage
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
// 标记所有等待这个Stage结束的Map-Stage Job为结束状态
if (shuffleStage.mapStageJobs.nonEmpty) {
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
for (job <- shuffleStage.mapStageJobs) {
markMapStageJobAsFinished(job, stats)
}
}
}
// Note: newly runnable stages will be submitted below when we submit waiting stages
}
}
...
}
submitWaitingStages()
}
override def taskSucceeded(index: Int, result: Any): Unit = {
// resultHandler call must be synchronized in case resultHandler itself is not thread safe.
synchronized {
resultHandler(index, result.asInstanceOf[T])
}
if (finishedTasks.incrementAndGet() == totalTasks) {
jobPromise.success(())
}
}
前言
在文章Task执行流程 中介绍了task是怎么被分配到executor上执行的,本文讲解task成功执行时将结果返回给driver的处理流程。
Driver端接收task完成事件
在executor上成功执行完task并拿到serializedResult 之后,通过CoarseGrainedExecutorBackend的statusUpdate方法来返回结果给driver,该方法会使用driverRpcEndpointRef 发送一条包含 serializedResult 的 StatusUpdate 消息给 driver。
而在driver端CoarseGrainedSchedulerBackend 在接收到StatusUpdate事件的处理代码如下:
这里我们重点看看在TaskSchedulerImpl里面根据task的状态做了什么样的操作:
task状态为Lost,则标记对应的executor也丢失,并涉及到一些映射跟新和意味着该executor上对应的task的重新分配;还有其他一些状态暂时不做解析。主要看task状态为FINISHED时,通过taskResultGetter的enqueueSuccessfulTask方法将task的的结果处理丢到了线程池中执行:
继续跟进scheduler是如何处理成功的task:
里面调用了该taskSetManager对成功task的处理方法:
逻辑很简单,标记task成功运行、跟新failedExecutors、若taskSet所有task都成功执行的一些处理,我们具体看看是怎么通知dagScheduler的,这里调用了dagScheduler的taskEnded方法:
这里像DAGScheduler Post了一个CompletionEvent事件,在DAGScheduler#doOnReceive有对应的处理:
继续看看 dagScheduler#handleTaskCompletion的实现,代码太长,列出主要逻辑部分:
当task为ShuffleMapTask时,该task不是在无效节点的运行的条件下将结果保存到stage中,若当前stage的所有task都运行完毕(不一定成功),则将所有结果注册到MapOutputTrackerMaster(以便下一个stage的task就可以通过它来获取shuffle的结果的元数据信息);然后清空本地缓存;当该stage有task没有成功执行也就没有结果,需要重新提交该stage运行未完成的task;若所有task都成功完成,说明该stage已经完成,则会去标记所有等待这个Stage结束的Map-Stage Job为结束状态。
当task为ResultTask时,增加job完成的task数,若所有task全部完成即job已经完成,则标记该stage完成并从runningStages中移除,在cleanupStateForJobAndIndependentStages方法中,遍历当前job的所有stage,在对应stage没有依赖的job时则直接将此stage移除。然后将当前job从activeJob中移除。
最后调用job.listener.taskSucceeded(rt.outputId, event.result),实际调用的是JobWaiter(JobListener的具体实现)的taskSucceeded方法:
这里的resultHandler就是在action操作触发runJob的时候规定的一种结果处理器:
这里的(index, res) => results(index) = res 就是resultHandler,也就是将这里的results数组填满再返回,根据不同的action进行不同操作。 若完成的task数和totalTasks数相等,则该job成功执行,打印日志完成。