apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

Support HDFS rack locality #349

Closed kimoonkim closed 7 years ago

kimoonkim commented 7 years ago

This is likely the last sub-item of HDFS locality umbrella issue #206.

When using HDFS, Spark driver looks up which rack a given datanode or executor belongs to. So that it sends tasks to right executors that can read task data from datanodes on same racks. (This happens as a fallback when node locality fails). To support rack locality, Spark driver loads a configurable topology plugin into its JVM.

TaskSchedulerImpl has the getRackForHost method, that is supposed to be overridden by a subclass to call the topology plugin. Then TaskSetManager will call getRackForHost to populate pendingTasksForRack map. It gets called for each datanode host associated with input data blocks of pending tasks.

  private def addPendingTask(index: Int) {

...

      for (rack <- sched.getRackForHost(loc.host)) {
        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
      }
    }  

getRackForHost is also called with executor addresses when the driver is about to send tasks to executors.

  private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =

   ...

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, false))
      }
    }

Yarn implements getRackForHost method in YarnScheduler:

  override def getRackForHost(hostPort: String): Option[String] = {
    val host = Utils.parseHostPort(hostPort)._1
    Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
  }

We can add similar code in KubernetesTaskSchedulerImpl. The datanode handling will be exactly like the above. For executors, we would map pod IP addresses to cluster node name/IP and then call the topology plugin using the cluster node address. I'll send a PR soon.