val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
}
}
schedulableBuilder.buildPools()
}
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
dequeueTask(execId, host, allowedLocality) match {
case Some((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s" $taskLocality, ${serializedTask.limit} bytes)")
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
case _ =>
}
}
None
}
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
private def dequeueTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size
while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
if (!executorIsBlacklisted(execId, index)) {
// This should almost always be list.trimEnd(1) to remove tail
list.remove(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
}
}
None
}
// find a speculative task if all others tasks have been scheduled
dequeueSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
在DAGScheduler划分为Stage并以TaskSet的形式提交给TaskScheduler后,再由TaskScheduler通过TaskSetMagager对taskSet的task进行调度与执行。
submitTasks方法的实现在TaskScheduler的实现类TaskSchedulerImpl中。先看整个实现:
先为当前TaskSet创建TaskSetManager,TaskSetManager负责管理一个单独taskSet的每一个task,决定某个task是否在一个executor上启动,如果task失败,负责重试task直到task重试次数,并通过延迟调度来执行task的位置感知调度。
key为stageId,value为一个HashMap,其中key为stageAttemptId,value为TaskSet。
isZombie是TaskSetManager中所有tasks是否不需要执行(成功执行或者stage被删除)的一个标记,如果该TaskSet没有被完全执行并且已经存在和新进来的taskset一样的另一个TaskSet,则抛出异常,确保一个stage不能有两个taskSet同时运行。
将当前taskSet添加到调度池中,schedulableBuilder的类型是SchedulerBuilder的一个trait,有两个实现FIFOSchedulerBuilder和 FairSchedulerBuilder,并且默认采用的是FIFO方式。
schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。
接下来调用了SchedulerBackend的riviveOffers方法向schedulerBackend申请资源。backend也是通过scheduler.initialize(backend)的参数传递过来的,具体是在SparkContext 中被创建的。
回到向schedulerBackend申请资源, 调用CoarseGrainedSchedulerBackend的reviveOffers方法,该方法给driverEndpoint发送ReviveOffer消息。
driverEndpoint收到ReviveOffer消息后调用makeOffers方法。
该方法先过滤出活跃的executor并封装成WorkerOffer,WorkerOffer包含executorId、host、可用的cores三个信息。这里的executorDataMap是HashMap[String, ExecutorData]类型,key为executorId,value为对应executor的信息,包括host、RPC信息、totalCores、freeCores。
在客户端向Master注册Application的时候,Master已经为Application分配并启动好Executor,然后注册给CoarseGrainedSchedulerBackend,注册信息就是存储在executorDataMap数据结构中。
先看里面的scheduler.resourceOffers(workOffers),TaskSchedulerImpl调用resourceOffers方法通过准备好的资源获得要被执行的Seq[TaskDescription],交给CoarseGrainedSchedulerBackend分发到各个executor上执行。下面看具体实现:
跟进resourceOfferSingleTaskSet方法:
这个方法主要是遍历所有可用的executor,在core满足一个task所需core的条件下,通过resourceOffer方法获取taskSet能在该executor上启动的task,并添加到tasks中予以返回。下面具体看resourceOffer的实现:
getAllowedLocalityLevel(curTime)会根据延迟调度调整合适的Locality,目的都是尽可能的以最好的locality来启动每一个task,getAllowedLocalityLevel返回的是当前taskSet中所有未执行的task的最高locality,以该locality作为本次调度能容忍的最差locality,在后续的搜索中只搜索本地性比这个级别好的情况。allowedLocality 最终取以getAllowedLocalityLevel(curTime)返回的locality和maxLocality中级别较高的locality。
根据allowedLocality寻找合适的task,若返回不为空,则说明在该executor上分配了task,然后进行信息跟新,将taskid加入到runningTask中,跟新延迟调度信息,序列化task,通知DAGScheduler,最后返回taskDescription,我们来看看dequeueTask的实现:
首先看是否存在execId对应的PROCESS_LOCAL类别的任务,如果存在,取出来调度,如果不存在,只在比allowedLocality大或者等于的级别上去查看是否存在execId对应类别的任务,若有则调度。
其中的dequeueTaskFromList是从execId对应类别(如PROCESS_LOCAL)的任务列表中尾部取出一个task返回其在taskSet中的taskIndex,跟进该方法:
这里有个黑名单机制,利用executorIsBlacklisted方法查看该executor是否属于task的黑名单,黑名单记录task上一次失败所在的Executor Id和Host,以及其对应的“黑暗”时间,“黑暗”时间是指这段时间内不要再往这个节点上调度这个Task了。
可以看到在dequeueTask方法的最后一段代码:
这里是启动推测执行,推测任务是指对一个Task在不同的Executor上启动多个实例,如果有Task实例运行成功,则会干掉其他Executor上运行的实例,只会对运行慢的任务启动推测任务。
通过scheduler.resourceOffers(workOffers)方法返回了在哪些executor上启动哪些task的Seq[Seq[TaskDescription]]信息后,将调用launchTasks来启动各个task,实现如下:
先将task进行序列化, 如果当前task序列化后的大小超过了128MB-200KB,跳过当前task,并把对应的taskSetManager置为zombie模式,若大小不超过限制,则发送消息到executor启动task执行。