teeyog / blog

My Blog
76 stars 24 forks source link

数据本地化及延迟调度 #8

Open teeyog opened 6 years ago

teeyog commented 6 years ago

前言

Spark数据本地化即移动计算而不是移动数据,而现实又是残酷的,不是想要在数据块的地方计算就有足够的资源提供,为了让task能尽可能的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还无法启动则降低Locality Levels再尝试启动……

本地化级别(Locality Levels)

这些Task的本地化级别其实描述的就是计算与数据的位置关系,这个最终的关系是如何产生的呢?接下来对其来龙去脉进行详细的讲解。

DAGScheduler提交tasks

DAGScheduler对job进行stage划分完后,会通过submitMissingTasks方法将Stage以TaskSet的形式提交给TaskScheduler,看看该方法关于位置优先的一些代码:

...
// 获取还未执行或未成功执行分区的id
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
...
// 通过getPreferredLocs方法获取rdd该分区的优先位置
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          val job = s.activeJob.get
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch { 
    }
...
//通过最优位置等信息构建Task
val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
          }

        case stage: ResultStage =>
          val job = stage.activeJob.get
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
          }
      }
    } catch { 
    }
...
//将所有task以TaskSet的形式提交给TaskScheduler
taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

注意这里提交的TaskSet里面的Task已经包含了该Task的优先位置,而该优先位置是通过getPreferredLocs方法获取,可以简单看看其实现:

private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    ...
    // 从缓存中获取
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // 直接通过rdd的preferredLocations方法获取
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }
    // 递归从parent Rdd获取(窄依赖)
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }
      case _ =>
    }
    Nil
  }

无论是通过哪种方式获取RDD分区的优先位置,第一次计算的数据来源肯定都是通过RDD的preferredLocations方法获取的,不同的RDD有不同的preferredLocations实现,但是数据无非就是在三个地方存在,被cache到内存、HDFS、磁盘,而这三种方式的TaskLocation都有具体的实现:

//数据在内存中
private [spark] case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
  extends TaskLocation {
  override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}
//数据在磁盘上(非HDFS上)
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
  override def toString: String = host
}
//数据在HDFS上
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
  override def toString: String = TaskLocation.inMemoryLocationTag + host
}

所以,在实例化Task的时候传的优先位置就是这三种的其中一种。

Locality levels生成

DAGScheduler将TaskSet提交给TaskScheduler后,TaskScheduler会为每个TaskSet创建一个TaskSetMagager来对其Task进行管理,在初始化TaskSetMagager的时候就会通过computeValidLocalityLevels计算该TaskSet包含的locality levels:

private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
      levels += PROCESS_LOCAL
    }
    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
        pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
      levels += NODE_LOCAL
    }
    if (!pendingTasksWithNoPrefs.isEmpty) {
      levels += NO_PREF
    }
    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
        pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
      levels += RACK_LOCAL
    }
    levels += ANY
    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
    levels.toArray
  }

程序会依次判断该TaskSetMagager是否包含各个级别,逻辑都类似,我们就细看第一个,pendingTasksForExecutor的定义与添加:

// key为executorId,value为在该executor上有缓存的数据块对应的taskid数组
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
...
//遍历所有该TaskSet的所有task进行添加
for (i <- (0 until numTasks).reverse) {
    addPendingTask(i)
  }
...
private def addPendingTask(index: Int) {
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        case e: ExecutorCacheTaskLocation =>
          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
        case e: HDFSCacheTaskLocation =>
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) =>
              for (e <- set) {
                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
              }
              logInfo(s"Pending task $index has a cached location at ${e.host} " +
                ", where there are executors " + set.mkString(","))
            case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
                ", but there are no executors alive there.")
          }
        case _ =>
      }
      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
      for (rack <- sched.getRackForHost(loc.host)) {
        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
      }
    }

    if (tasks(index).preferredLocations == Nil) {
      pendingTasksWithNoPrefs += index
    }

    allPendingTasks += index  // No point scanning this whole list to find the old task there
  }

注意这里的addPendingTask方法,会遍历该TaskSetMagager管理的所有Task的优先位置(上文已解析),若是ExecutorCacheTaskLocation (缓存在内存中)则添加对应的executorId和taskId到pendingTasksForExecutor,同时还会添加到低级别需要的pendingTasksForHost、pendingTasksForRack中,说明假设一个 task 的最优本地性为 X,那么该 task 同时也具有其他所有本地性比X差的本地性。 回到上面的本地性级别判断:

if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
      levels += PROCESS_LOCAL
    }

只要是看第三个判断 pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive())),其中,pendingTasksForExecutor.keySet就是上面说明的存在有与task对应的数据块被缓存在executor中的executorId,sched.isExecutorAlive()就是判断参数中的 executor id 当前是否 active。所以整行代码意思是存在有与task对应的数据块被缓存在executor中的executors是否有active的,若有则添加PROCESS_LOCAL级别到该TaskSet的LocalityLevels中。

后面的其他本地性级别是同样的逻辑就不细讲了,区别是如判断存在有与task对应的数据块在某些节点中的hosts是否有Alive的等……

至此,TaskSet包含的LocalityLevels就已经计算完。

延迟调度策略

若spark跑在yarn上,也有两层延迟调度,第一层就是yarn尽量将spark的executor分配到有数据的nodemanager上,这一层没有做到data locality,到spark阶段,data locality更不可能了。

延迟调度的目的是为了较小网络及IO开销,在数据量大而计算逻辑简单(task执行时间小于数据传输时间)的情况下表现明显。

Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动。

TaskSetMagager会以某一种TaskSet包含的本地性级别遍历每个可用executor资源尝试在该executor上启动当前管理的tasks,那么是如何决定某个task能否在该executor上启动呢?首先都会通过getAllowedLocalityLevel(curTime)方法计算当前TaskSetMagager中未执行的tasks的最高本地级别:

private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    // Remove the scheduled or finished tasks lazily
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      while (indexOffset > 0) {
        indexOffset -= 1
        val index = pendingTaskIds(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // Walk through the list of tasks that can be scheduled at each location and returns true
    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
    // already been scheduled.
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }

    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
      }
      if (!moreTasks) {
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        lastLaunchTime += localityWaits(currentLocalityIndex)
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
        currentLocalityIndex += 1
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    myLocalityLevels(currentLocalityIndex)
  }

循环条件里的currentLocalityIndex是getAllowedLocalityLevel 前一次被调用返回的LocalityIndex在 myLocalityLevels 中的索引,初始值为0,myLocalityLevels则是TaskSetMagager所有tasks包含的本地性级别。

至此,就取出了该TaskSetMagager中未执行的tasks的最高本地性级别(取和maxLocality中级别高的作为最终的allowedLocality)。

最终决定是否在某个executor上启动某个task的是方法dequeueTask(execId, host, allowedLocality)

private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
      for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
        return Some((index, TaskLocality.PROCESS_LOCAL, false))
      }
    }
    ...
  }

通过TaskLocality.isAllowed方法来保证只以比allowedLocality级别高(可相等)的locality来启动task,因为一个 task 拥有比最优本地性 差的其他所有本地性。这样就保证了能尽可能的以高本地性级别来启动一个task。

优化建议

可用过Spark UI来查看某个job的task的locality level,若都是NODE_LOCAL、ANY,那么可调整数据本地化的等待时长: