apache-spark-on-k8s / spark

Apache Spark enhanced with native Kubernetes scheduler back-end: NOTE this repository is being ARCHIVED as all new development for the kubernetes scheduler back-end is now on https://github.com/apache/spark/
https://spark.apache.org/
Apache License 2.0
612 stars 118 forks source link

set RestartPolicy=Never for executor #367

Closed honkiko closed 7 years ago

honkiko commented 7 years ago

As for current implementation the RestartPolicy of executor pod is not set, so the default value "OnFailure" is in effect. But this causes problem.

If an executor is terminated unexpectedly, for example, exit by java.lang.OutOfMemoryError, it'll be restarted by k8s with the same executor ID. When the new executor tries to fetch a block hold by the last executor, ShuffleBlockFetcherIterator.splitLocalRemoteBlocks() think it's a local block and tries to read it from it's local dir. But the executor's local dir is changed because random generated ID is part of local dir. FetchFailedException will raise and the stage will fail.

The rolling Error message:

17/06/29 01:54:56 WARN KubernetesTaskSetManager: Lost task 0.1 in stage
2.0 (TID 7, 172.16.75.92, executor 1): FetchFailed(BlockManagerId(1,
172.16.75.92, 40539, None), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException:
/data2/spark/blockmgr-0e228d3c-8727-422e-aa97-2841a877c42a/32/shuffle_2_0_0.index
(No such file or directory)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
        at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

What changes were proposed in this pull request?

set RestartPolicy=Never for executor

How was this patch tested?

Tested on my local testbed and the executor RestartPolicy is now "Never". Should work together with https://github.com/apache-spark-on-k8s/spark/pull/244

Please review http://spark.apache.org/contributing.html before opening a pull request.

erikerlandson commented 7 years ago

This seems like it will work in the Dynamic Allocation scenario; if dynamic allocation isn't on, does this mean the cluster size remains diminished by one?

foxish commented 7 years ago

We added this for the driver in https://github.com/apache-spark-on-k8s/spark/pull/303. However, the expectation was that the executor will recover. However, it seems like having it reconnect with the same ID doesn't really let us accomplish that. This is okay as long as executor recovery semantics handle this and respawn executors when lost anyway. @varunkatta can you confirm that we'll get the same behavior even if we disable executor restarts?

foxish commented 7 years ago

/cc @kimoonkim

ash211 commented 7 years ago

Minor merge conflict, but +1 from me. We shouldn't be restarting executors with the same ID, so better to do the restart at the Spark driver level than k8s.

foxish commented 7 years ago

Will merge on green

varunkatta commented 7 years ago

ExectuorRecovery should work even with restartPolicy set to Never. This change is good to go, I think.