apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

heartbeat timeout causing all executors exit #405

Closed duyanghao closed 7 years ago

duyanghao commented 7 years ago

I run a spark application with 100 executors(each has 40G and 4cores),after running 4 hours,all executors exit with 56 and report the following logs:

...
Exit as unable to send heartbeats to driver more than 60 times
...

and driver hangs for several hours with nothing to do.

the relevant logs of driver list below:

2017-07-10 21:11:40 WARN  Executor:87 - Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(31,[Lscala.Tuple2;@641834e8,BlockManagerId(31, x.x.x.x, 57787, None))]
2017-07-10 21:11:49 WARN  Executor:87 - Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(52,[Lscala.Tuple2;@2de28748,BlockManagerId(52, x.x.x.x, 20423, None))]
2017-07-10 21:11:42 WARN  Executor:87 - Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(20,[Lscala.Tuple2;@431947f,BlockManagerId(20, x.x.x.x, 28993, None))]
2017-07-10 21:11:44 WARN  Executor:87 - Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(99,[Lscala.Tuple2;@6f8e1be0,BlockManagerId(99, x.x.x.x, 47398, None))]
2017-07-10 21:11:46 WARN  Executor:87 - Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(87,[Lscala.Tuple2;@6ff2c572,BlockManagerId(87, x.x.x.x, 47063, None))]

Addition:

  1. Task Resources(1 driver + 100 executors) driver: 40G+4cores single executor: 40G+4cores

  2. There are no user-specified spark configurations for this spark task. All using default ones.

  3. It does not look like the network problem as other tasks in the same cluster run normally.

erikerlandson commented 7 years ago

@duyanghao is it a situation where the heartbeats are sending correctly for some period of a time, and then stop working or do they never succeed?

duyanghao commented 7 years ago

@erikerlandson all 100 executors are sending 60*3=180 heartbeats(10s timeout),but never succeed.

duyanghao commented 7 years ago

@erikerlandson i add some debug logs in core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala as below:

diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scal
index 5242ab6..d802dc7 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -121,15 +121,19 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)

     // Messages received from executors
     case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
+      logInfo(s"executor:$executorId heartbeat $blockManagerId received")
       if (scheduler != null) {
         if (executorLastSeen.contains(executorId)) {
           executorLastSeen(executorId) = clock.getTimeMillis()
           eventLoopThread.submit(new Runnable {
             override def run(): Unit = Utils.tryLogNonFatalError {
+              logInfo(s"executor:$executorId heartbeat $blockManagerId scheduler before")
               val unknownExecutor = !scheduler.executorHeartbeatReceived(
                 executorId, accumUpdates, blockManagerId)
               val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+              logInfo(s"executor:$executorId heartbeat $blockManagerId $response reply before")
               context.reply(response)
+              logInfo(s"executor:$executorId heartbeat $blockManagerId $response reply")
             }
           })
         } else {
@@ -137,7 +141,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
           // after we just removed it. It's not really an error condition so we should
           // not log warning here. Otherwise there may be a lot of noise especially if
           // we explicitly remove executors (SPARK-4134).
-          logDebug(s"Received heartbeat from unknown executor $executorId")
+          logInfo(s"Received heartbeat from unknown executor:$executorId heartbeat")
           context.reply(HeartbeatResponse(reregisterBlockManager = true))
         }
       } else {

and there are following driver logs(after grep executor:1 heartbeat) when i run the task again:

2017-08-01 03:10:55 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:11:08 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:11:21 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:11:31 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:11:44 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:11:57 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:12:07 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:12:20 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:12:33 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:12:40 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) scheduler before
2017-08-01 03:12:41 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) HeartbeatResponse(false) reply before
2017-08-01 03:12:41 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) HeartbeatResponse(false) reply
2017-08-01 03:12:43 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:12:56 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received
2017-08-01 03:13:09 INFO  HeartbeatReceiver:54 - executor:1 heartbeat BlockManagerId(1, 192.168.5.246, 34451, None) received

And at the same time, the executor 1 logs below:

2017-08-01 03:11:05 WARN  NettyRpcEndpointRef:87 - Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@1864841d,BlockManagerId
(1, 192.168.5.246, 34451, None))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatI
nterval
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
        ... 14 more
2017-08-01 03:11:18 WARN  NettyRpcEndpointRef:87 - Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@1864841d,BlockManagerId
(1, 192.168.5.246, 34451, None))] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatI
nterval
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
        ... 14 more
2017-08-01 03:11:31 WARN  NettyRpcEndpointRef:87 - Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@1864841d,BlockManagerId
(1, 192.168.5.246, 34451, None))] in 3 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatI
nterval
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
        ... 14 more
2017-08-01 03:11:31 WARN  Executor:87 - Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@1864841d,BlockManagerId(1, 192.168.5.246,
 34451, None))]
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:119)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor
.heartbeatInterval
        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        ... 13 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
        ... 14 more

i guess it is the problem of eventLoopThread as below:

  // "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
  // block the thread for a long time.
  private val eventLoopThread =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")
  /**
   * Wrapper over ScheduledThreadPoolExecutor.
   */
  def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
    val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
    val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
    // By default, a cancelled task is not automatically removed from the work queue until its delay
    // elapses. We have to enable it manually.
    executor.setRemoveOnCancelPolicy(true)
    executor
  }

JDK version:

java -version
openjdk version "1.8.0_121"
OpenJDK Runtime Environment (IcedTea 3.3.0) (Alpine 8.121.13-r0)
OpenJDK 64-Bit Server VM (build 25.121-b13, mixed mode)
tangzhankun commented 7 years ago

Perhaps we need to find a simple workload to reproduce this. Will it help if we just let the executor sleep there?

hustcat commented 7 years ago
"heartbeat-receiver-event-loop-thread" #41 daemon prio=5 os_prio=0 tid=0x00007f5b17b3a800 nid=0x2d waiting for monitor entry [0x00007f4f9621c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:406)
    - waiting to lock <0x00007f5003f2f1b0> (a org.apache.spark.scheduler.cluster.kubernetes.KubernetesTaskSchedulerImpl)
    at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:131)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
    at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

We found the driver heartbeat thread is blocked, and so can't handle heartbeat message from executor.

  override def executorHeartbeatReceived(
      execId: String,
      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
      blockManagerId: BlockManagerId): Boolean = {
    // (taskId, stageId, stageAttemptId, accumUpdates)
    val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
      accumUpdates.flatMap { case (id, updates) =>
        val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None))
        taskIdToTaskSetManager.get(id).map { taskSetMgr =>
          (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos)
        }
      }
    }
    dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId)
  }

However, should only one thread call this code, why thread is blocked?

hustcat commented 7 years ago

@apache-spark-on-k8s/contributors Can any one give some suggestion?

tangzhankun commented 7 years ago

@hustcat Could you please share the reproduce steps so I can try on my cluster?

hustcat commented 7 years ago

We have found the reasons, and we will submit a PR to resolve this problem later:)

duyanghao commented 7 years ago

@kimoonkim the problem is your commit,i drop your commit code,and it works without heartbeat timeout. but i can't figure out the purpose of your commit. could you explain in detail why add following codes:

...
            val clusterNodeFullName = inetAddressUtil.getFullHostName(clusterNodeIP)
            val pendingTasksClusterNodeFullName = super.getPendingTasksForHost(clusterNodeFullName)
            if (pendingTasksClusterNodeFullName.nonEmpty) {
              logDebug(s"Got preferred task list $pendingTasksClusterNodeFullName " +
                s"for executor host $executorIP using cluster node full name $clusterNodeFullName")
            }
            pendingTasksClusterNodeFullName
...
kimoonkim commented 7 years ago

@duyanghao Thanks for finding this as a potential root cause. We can think about why this commit could cause the issue. Maybe it is making the driver code slow to respond to heartbeats and other executor RPCs, and thus causing more timeouts to be likely? The InetAddressUtil involves DNS lookup, which can be slow if there are a lot of executors. Maybe we should find a way to avoid hitting this code path as common case. Maybe we can flag guard this code path.

This is part of HDFS locality support. The routine getPendingTasksForHost looks up tasks to send to a given executor host. When HDFS is used, we want to send tasks that have their input HDFS data on the executor. To do so, we get the cluster node that the executor is running on and compare the hostname of the node with the datanode hostname.

Normally, this works well without DNS lookup because both the cluster node name, which we get from the pod data structure, and the datanode name are fully qualified host names. But in some K8s setup, notably GKE, the cluster node name is short host name. The change specifically addresses the case by getting the full host name from DNS.

kimoonkim commented 7 years ago

I'll write a PR shortly, flag-guarding the DNS lookup. @hustcat, @duyanghao if you guys can try out the fix, that would be great. Thanks.

kimoonkim commented 7 years ago

PR #412 is posted. @hustcat @duyanghao PTAL.

kimoonkim commented 7 years ago

@hustcat @duyanghao Were you using HDFS as input or output of your Spark jobs? Do you use HDFS in general. If no, I think disabling HDFS locality support automatically per this thread is also a possible fix. Please let us know.

hustcat commented 7 years ago

Yeah, it's DNS lookup cause heartbeart timeout. We run HDFS as separate cluster out of k8s now, so disable HDFS locality support is OK for us.

However, I think DNS lookup is expensive operation, and also can be failed. I don't know YARN how to resolve this. Anyway, we will test your PR, THANKS.

kimoonkim commented 7 years ago

@hustcat Thanks for the answer.

Yes, I agree DNS lookup is expensive. The need for DNS lookup is unique to K8s. YARN does not have this issue. I think I have to re-design this DNS lookup approach while disabling the code path for now.