cerndb / dist-keras

Distributed Deep Learning, with a focus on distributed training, using Keras and Apache Spark.
http://joerihermans.com/work/distributed-keras/
GNU General Public License v3.0
624 stars 169 forks source link

Cannot make dist-keras work on cloudera cluster #13

Closed CarreauClement closed 7 years ago

CarreauClement commented 7 years ago

Hello,

I've been struggling for days with dist-keras and I just can't seem to make it work on our cloudera cluster.

I've been runnign the ADAG (Convolutional network) on the MNIST example (https://github.com/cerndb/dist-keras/blob/master/examples/mnist.ipynb) on my local VM and it just works !

However, when I try to make this example work on our cluster, using several workers, I constantly get the following error :

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task 0.3 in stage 11.0 (TID 222, datalab-prj-cloudera-slave-3, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.0 GB of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

At first, I tried to increase the memory but even 10g isn't enough. I profiled the MNIST on my VM and it takes close to nothing so I don't understand why it's using sooo much memory on the cluster.

Any idea ?

Thank you !

JoeriHermans commented 7 years ago

You need to lower the 5GB executor memory configuration to fit your YARN configuration (4.5GB) executor memory configuration. If you lower this to 4GB, it should work fine.

Let me know if this worked.

Joeri

JoeriHermans commented 7 years ago

Sorry, I misread your issue. This is a strange problem indeed. Which Spark version / Cloudera distribution are you running? We are running a Cloudera distribution as well, and it works fine there. I'll update the notebook tomorrow morning, since the one you are using (which currently is on the master branch) is an outdated one.

Joeri

CarreauClement commented 7 years ago

Hello,

Spark version is 2.0 (I also tried with 1.6) Cloudera version is 5.10.0

JoeriHermans commented 7 years ago

Did you install the framework on all machines? There could be an underlying issue here. Can you obtain the logs of the failed executor, so not the driver log?

For the sake of completeness, did you try to increase spark.yarn.executor.memoryOverhead?

CarreauClement commented 7 years ago

The framework is correctly installed on all nodes. Here's what I have on the executor logs :

17/04/03 11:51:58 INFO executor.CoarseGrainedExecutorBackend: Started daemon with process name: 23378@datalab-prj-cloudera-slave-2 17/04/03 11:51:58 INFO util.SignalUtils: Registered signal handler for TERM 17/04/03 11:51:58 INFO util.SignalUtils: Registered signal handler for HUP 17/04/03 11:51:58 INFO util.SignalUtils: Registered signal handler for INT 17/04/03 11:51:59 INFO spark.SecurityManager: Changing view acls to: root 17/04/03 11:51:59 INFO spark.SecurityManager: Changing modify acls to: root 17/04/03 11:51:59 INFO spark.SecurityManager: Changing view acls groups to: 17/04/03 11:51:59 INFO spark.SecurityManager: Changing modify acls groups to: 17/04/03 11:51:59 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 17/04/03 11:51:59 INFO client.TransportClientFactory: Successfully created connection to /172.30.2.19:41299 after 64 ms (0 ms spent in bootstraps) 17/04/03 11:51:59 INFO spark.SecurityManager: Changing view acls to: root 17/04/03 11:51:59 INFO spark.SecurityManager: Changing modify acls to: root 17/04/03 11:51:59 INFO spark.SecurityManager: Changing view acls groups to: 17/04/03 11:51:59 INFO spark.SecurityManager: Changing modify acls groups to: 17/04/03 11:51:59 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 17/04/03 11:51:59 INFO client.TransportClientFactory: Successfully created connection to /172.30.2.19:41299 after 1 ms (0 ms spent in bootstraps) 17/04/03 11:51:59 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-7a51518b-1bc0-447d-9eea-4effc3b14850/executor-edd884f9-588a-40cc-9adf-03ed549e9f92/blockmgr-868d0832-143b-4f95-bc49-9b5e33e4b86c 17/04/03 11:51:59 INFO memory.MemoryStore: MemoryStore started with capacity 2004.6 MB 17/04/03 11:52:00 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@172.30.2.19:41299 17/04/03 11:52:00 INFO worker.WorkerWatcher: Connecting to worker spark://Worker@172.30.2.21:37493 17/04/03 11:52:00 INFO client.TransportClientFactory: Successfully created connection to /172.30.2.21:37493 after 1 ms (0 ms spent in bootstraps) 17/04/03 11:52:00 INFO worker.WorkerWatcher: Successfully connected to spark://Worker@172.30.2.21:37493 17/04/03 11:52:00 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver 17/04/03 11:52:00 INFO executor.Executor: Starting executor ID 5 on host 172.30.2.21 17/04/03 11:52:00 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44919. 17/04/03 11:52:00 INFO netty.NettyBlockTransferService: Server created on 172.30.2.21:44919 17/04/03 11:52:00 INFO storage.BlockManager: external shuffle service port = 7337 17/04/03 11:52:00 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(5, 172.30.2.21, 44919) 17/04/03 11:52:00 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(5, 172.30.2.21, 44919) 17/04/03 11:52:00 INFO storage.BlockManager: Registering executor with local external shuffle service. 17/04/03 11:52:00 INFO client.TransportClientFactory: Successfully created connection to /172.30.2.21:7337 after 1 ms (0 ms spent in bootstraps) 17/04/03 11:53:00 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

I've only seen an actual error once, which is :

17/04/03 11:49:01 ERROR util.Utils: Uncaught exception in thread stdout writer for /opt/anaconda2/bin/python java.lang.AssertionError: assertion failed: Block rdd_11_0 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:294) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:633) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:437) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504) at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1957) at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 17/04/03 11:49:01 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for /opt/anaconda2/bin/python,5,main] java.lang.AssertionError: assertion failed: Block rdd_11_0 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:294) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:633) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:437) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:120) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504) at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1957) at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

And yes, I tried inscreasing spark.yarn.executor.memoryOverhead, no results.

EDIT :

I tried running it on Spark Standalone and even though so far it hasn't crashed I think it's stuck since it's going nothing (according to the logs that have not been updated since 30min)

Here are the errors I have when running on Standalone :

17/04/03 11:49:01 WARN executor.Executor: 1 block locks were not released by TID = 6: [rdd_11_0] 17/04/03 11:49:01 INFO executor.Executor: Finished task 0.0 in stage 5.0 (TID 6). 5366 bytes result sent to driver 17/04/03 11:49:01 ERROR util.Utils: Uncaught exception in thread stdout writer for /opt/anaconda2/bin/python java.lang.AssertionError: assertion failed: Block rdd_11_0 is not locked for reading

JoeriHermans commented 7 years ago

Is it possible that Dynamic Resource Allocation is enabled by default on your cluster? This would explain " ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM".

CarreauClement commented 7 years ago

Yep, it was enabled but we already tried setting it to false. I'll retry and get back to you.

JoeriHermans commented 7 years ago

Thanks!

CarreauClement commented 7 years ago

Once again I think it's stuck. There were logs on stderr every seconds and now nothing.

Moreover, from the current stage metrics, I think nothing is being processed.

dist_keras_spark

JoeriHermans commented 7 years ago

Hmmm, strange. I added logging to the executors. Could you pull the most recent version from the master branch? If the workers generate any exceptions during the data prefetching, or optimization process, it should generate the errors in the executor logs.

JoeriHermans commented 7 years ago

I'm observing similar in the behaviour in the master branch as well. I'll keep you up-to-date.

JoeriHermans commented 7 years ago

Hi Carreau,

In my case, the issue occurred when the following configuration was specified: conf.set("spark.executor.heartbeatInterval", "3600s") conf.set("spark.akka.timeout", "200s") conf.set("spark.akka.heartbeat.interval", "60s")

I don't know which one was the actual primer of the issue, but I guess it is the marked one. Could you check if you have a large value for this configuration? This could be the issue.

JoeriHermans commented 7 years ago

Hi Carreau,

This should fix your issue. The problem originated because of a loaded cluster. So, the datanode takes too long to sent the data to the executor, which results in the fact that the executor was too late to sent the heartbeat within the driver interval. However, increasing the heartbeat interval by itself didn't solve this issue. In order to resolve this, one should set spark.network.timeout to an even higher value. In my case (x5).

I will close this issue for now, since I believe this is the answer. If you still have the same problem, feel free to reopen it.

Joeri

linrio commented 7 years ago

I got the same ERROR: error worker received signal term

How to fix it?

JoeriHermans commented 7 years ago

@linrio

Could you provide the full Spark Executor logs?

Joeri