yahoo / TensorFlowOnSpark

TensorFlowOnSpark brings TensorFlow programs to Apache Spark clusters.
Apache License 2.0
3.87k stars 938 forks source link

The mnist distributed train example raise Exception: Timeout while feeding partition #456

Closed linjiaqin closed 4 years ago

linjiaqin commented 4 years ago

Environment:

Describe the bug: when I run the mnist train example in spark standalone, it seized up and stop. It occurs

Logs: 2019-09-24 13:31:11,285 INFO cluster.StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 args: Namespace(batch_size=100, cluster_size=2, epochs=1, format='csv', images='examples/mnist/csv/train/images', labels='examples/mnist/csv/train/labels', mode='train', model='mnist_model', output='predictions', rdma=False, readers=1, steps=1000, tensorboard=False) 2019-09-24T13:31:11.331185 ===== Start 2019-09-24 13:31:11,560 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 242.1 KB, free 2.5 GB) 2019-09-24 13:31:11,634 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.5 KB, free 2.5 GB) 2019-09-24 13:31:11,636 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on cu11:38440 (size: 23.5 KB, free: 2.5 GB) 2019-09-24 13:31:11,642 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:0 2019-09-24 13:31:11,710 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 242.2 KB, free 2.5 GB) 2019-09-24 13:31:11,723 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 23.5 KB, free 2.5 GB) 2019-09-24 13:31:11,726 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on cu11:38440 (size: 23.5 KB, free: 2.5 GB) 2019-09-24 13:31:11,727 INFO spark.SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.java:0 zipping images and labels 2019-09-24 13:31:11,770 INFO mapred.FileInputFormat: Total input paths to process : 10 2019-09-24 13:31:11,788 INFO mapred.FileInputFormat: Total input paths to process : 10 2019-09-24 13:31:11,820 INFO (MainThread-15780) Reserving TFSparkNodes 2019-09-24 13:31:11,821 INFO (MainThread-15780) cluster_template: {'ps': [0], 'worker': [1]} 2019-09-24 13:31:11,822 INFO (MainThread-15780) listening for reservations at ('192.168.0.11', 46274) 2019-09-24 13:31:11,823 INFO (MainThread-15780) Starting TensorFlow on executors 2019-09-24 13:31:11,833 INFO (MainThread-15780) Waiting for TFSparkNodes to start 2019-09-24 13:31:11,833 INFO (MainThread-15780) waiting for 2 reservations 2019-09-24 13:31:11,903 INFO spark.SparkContext: Starting job: foreachPartition at /software/hadoop3/anaconda3/envs/zoo/lib/python3.6/site-packages/tensorflowonspark/TFCluster.py:321 2019-09-24 13:31:11,922 INFO scheduler.DAGScheduler: Got job 0 (foreachPartition at /software/hadoop3/anaconda3/envs/zoo/lib/python3.6/site-packages/tensorflowonspark/TFCluster.py:321) with 2 output partitions 2019-09-24 13:31:11,923 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (foreachPartition at /software/hadoop3/anaconda3/envs/zoo/lib/python3.6/site-packages/tensorflowonspark/TFCluster.py:321) 2019-09-24 13:31:11,923 INFO scheduler.DAGScheduler: Parents of final stage: List() 2019-09-24 13:31:11,925 INFO scheduler.DAGScheduler: Missing parents: List() 2019-09-24 13:31:11,930 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (PythonRDD[8] at foreachPartition at /software/hadoop3/anaconda3/envs/zoo/lib/python3.6/site-packages/tensorflowonspark/TFCluster.py:321), which has no missing parents 2019-09-24 13:31:11,953 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 16.3 KB, free 2.5 GB) 2019-09-24 13:31:11,956 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 11.5 KB, free 2.5 GB) 2019-09-24 13:31:11,957 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on cu11:38440 (size: 11.5 KB, free: 2.5 GB) 2019-09-24 13:31:11,958 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1161 2019-09-24 13:31:11,971 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (PythonRDD[8] at foreachPartition at /software/hadoop3/anaconda3/envs/zoo/lib/python3.6/site-packages/tensorflowonspark/TFCluster.py:321) (first 15 tasks are for partitions Vector(0, 1)) 2019-09-24 13:31:11,972 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks 2019-09-24 13:31:12,834 INFO (MainThread-15780) waiting for 2 reservations 2019-09-24 13:31:12,869 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.0.8:45566) with ID 0 2019-09-24 13:31:12,891 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.0.8, executor 0, partition 0, PROCESS_LOCAL, 7856 bytes) 2019-09-24 13:31:13,042 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.0.7:42286) with ID 1 2019-09-24 13:31:13,045 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.0.7, executor 1, partition 1, PROCESS_LOCAL, 7856 bytes) 2019-09-24 13:31:13,056 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.0.8:35705 with 366.3 MB RAM, BlockManagerId(0, 192.168.0.8, 35705, None) 2019-09-24 13:31:13,197 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.0.7:45057 with 366.3 MB RAM, BlockManagerId(1, 192.168.0.7, 45057, None) 2019-09-24 13:31:13,341 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.0.8:35705 (size: 11.5 KB, free: 366.3 MB) 2019-09-24 13:31:13,475 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.0.7:45057 (size: 11.5 KB, free: 366.3 MB) 2019-09-24 13:31:13,836 INFO (MainThread-15780) waiting for 2 reservations 2019-09-24 13:31:14,837 INFO (MainThread-15780) waiting for 2 reservations 2019-09-24 13:31:15,838 INFO (MainThread-15780) waiting for 2 reservations 2019-09-24 13:31:16,840 INFO (MainThread-15780) waiting for 2 reservations 2019-09-24 13:31:17,841 INFO (MainThread-15780) all reservations completed 2019-09-24 13:31:17,841 INFO (MainThread-15780) All TFSparkNodes started 2019-09-24 13:31:17,841 INFO (MainThread-15780) {'executor_id': 0, 'host': '192.168.0.8', 'job_name': 'ps', 'task_index': 0, 'port': 44715, 'tb_pid': 0, 'tb_port': 0, 'addr': ('192.168.0.8', 46408), 'authkey': b'o\x1d\xd6\xbeT\xa6O\x15\x82\x8f\x87\x0f\x8e\x91\xce\x96'} 2019-09-24 13:31:17,841 INFO (MainThread-15780) {'executor_id': 1, 'host': '192.168.0.7', 'job_name': 'worker', 'task_index': 0, 'port': 33719, 'tb_pid': 0, 'tb_port': 0, 'addr': '/tmp/pymp-9xkwcw7q/listener-suknqjyv', 'authkey': b'\xc0r@\xd7Y\x9fHT\x89 \xc6~N\xdeNj'} 2019-09-24 13:31:17,841 INFO (MainThread-15780) Feeding training data 2019-09-24 13:31:17,902 INFO spark.SparkContext: Starting job: collect at PythonRDD.scala:166 2019-09-24 13:31:17,904 INFO scheduler.DAGScheduler: Got job 1 (collect at PythonRDD.scala:166) with 10 output partitions 2019-09-24 13:31:17,904 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (collect at PythonRDD.scala:166) 2019-09-24 13:31:17,904 INFO scheduler.DAGScheduler: Parents of final stage: List() 2019-09-24 13:31:17,904 INFO scheduler.DAGScheduler: Missing parents: List() 2019-09-24 13:31:17,905 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (PythonRDD[10] at RDD at PythonRDD.scala:53), which has no missing parents 2019-09-24 13:31:17,917 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 16.6 KB, free 2.5 GB) 2019-09-24 13:31:17,920 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 9.0 KB, free 2.5 GB) 2019-09-24 13:31:17,921 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on cu11:38440 (size: 9.0 KB, free: 2.5 GB) 2019-09-24 13:31:17,922 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1161 2019-09-24 13:31:17,923 INFO scheduler.DAGScheduler: Submitting 10 missing tasks from ResultStage 1 (PythonRDD[10] at RDD at PythonRDD.scala:53) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) 2019-09-24 13:31:17,923 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 10 tasks 2019-09-24 13:31:18,240 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 192.168.0.7, executor 1, partition 0, ANY, 8535 bytes) 2019-09-24 13:31:18,245 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 5198 ms on 192.168.0.7 (executor 1) (1/2) 2019-09-24 13:31:18,253 INFO python.PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 54429 2019-09-24 13:31:18,321 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.0.7:45057 (size: 9.0 KB, free: 366.3 MB) 2019-09-24 13:31:18,392 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.7:45057 (size: 23.5 KB, free: 366.3 MB) 2019-09-24 13:31:19,452 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.0.7:45057 (size: 23.5 KB, free: 366.2 MB) 2019-09-24 13:41:21,586 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 192.168.0.7, executor 1, partition 1, ANY, 8535 bytes) 2019-09-24 13:41:21,606 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 192.168.0.7, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/spark-2.4.3/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main process() File "/usr/local/spark-2.4.3/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/local/spark-2.4.3/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func File "/usr/local/spark-2.4.3/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func File "/usr/local/spark-2.4.3/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func File "/usr/local/spark-2.4.3/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func File "/usr/local/spark-2.4.3/python/lib/pyspark.zip/pyspark/rdd.py", line 801, in func File "/software/hadoop3/anaconda3/envs/zoo/lib/python3.6/site-packages/tensorflowonspark/TFSparkNode.py", line 420, in _train raise Exception("Timeout while feeding partition") Exception: Timeout while feeding partition

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Spark Submit Command Line: ${SPARK_HOME}/bin/spark-submit \ --master spark://cu11:7077 \ --py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \ --conf spark.cores.max=${TOTAL_CORES} \ --conf spark.task.cpus=${CORES_PER_WORKER} \ --conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \ ${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \ --cluster_size ${SPARK_WORKER_INSTANCES} \ --images examples/mnist/csv/train/images \ --labels examples/mnist/csv/train/labels \ --format csv \ --mode train \ --model mnist_model

leewyang commented 4 years ago

This is likely related to #454.

leewyang commented 4 years ago

Closing due to inactivity. Feel free to re-open if still an issue.

vaibhavsingh007 commented 4 years ago

Hi, I am facing the exact same issue when running the mnist keras example using Spark Import. PySpark 2.4.5 Python 3 TF 2.1.0 TFoS 2.2.1

CMD

${SPARK_HOME}/bin/spark-submit --master ${MASTER} --conf spark.cores.max=${TOTAL_CORES} --conf spark.task.cpus=${CORES_PER_WORKER} ${TFoS_HOME}/examples/mnist/keras/mnist_spark.py --cluster_size ${SPARK_WORKER_INSTANCES} --images_labels ${TFoS_HOME}/data/mnist/csv/train --model_dir ${TFoS_HOME}/mnist_model --export_dir ${TFoS_HOME}/mnist_export

MASTER UI

image

GETS STUCK AT

image

After a long wait, throws ERROR:

20/04/30 16:20:54 INFO DAGScheduler: Job 0 finished: foreachPartition at /home/vsing20/.local/lib/python3.6/site-packages/tensorflowonspark/TFCluster.py:325, took 17.865089 s

20/04/30 16:31:09 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 6, 10.130.175.85, executor 0, partition 3, PROCESS_LOCAL, 8064 bytes) 20/04/30 16:31:09 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 4, 10.130.175.85, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/rh/rh-python36/root/usr/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main process() File "/opt/rh/rh-python36/root/usr/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/code/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func File "/code/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func File "/code/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 2499, in pipeline_func File "/code/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 352, in func File "/code/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/rdd.py", line 801, in func File "/home/vsing20/.local/lib/python3.6/site-packages/tensorflowonspark/TFSparkNode.py", line 495, in _train raise Exception("Timeout while feeding partition") Exception: Timeout while feeding partition

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
    at org.apache.spark.a

... ...

NOTE

I read somewhere I need to

set paths to libjvm.so, libhdfs.so, and libcuda*.so

but I am not using HDFS. Does TFoS only work with HDFS (Hadoop)?

Please help me resolve this since I do not have GPU support at the moment and would like to leverage Spark with TF2 to improve performance.

leewyang commented 4 years ago

@vaibhavsingh007 those libraries are only required for HDFS and/or GPU support. It looks like you're running in the Spark Standalone mode (without HDFS) and you aren't using GPUs, so none of this should apply. That said, it looks like you only have one worker in your Spark Standalone cluster. Please run the example exactly as shown here, i.e. export SPARK_WORKER_INSTANCES=3

vaibhavsingh007 commented 4 years ago

Hi, TL;DR; Your input about exact number of workers helped resolve the issue. I now have the model exported.

Expounding.. I was following the same Readme, but had to start my own master and worker since running the spark master cmd from there had this issue:

image

and then..

image

It works fine now after I manually spawned the master and 2 workers with

export SPARK_WORKER_INSTANCES=2

Was under the impression that that variable is for Executors since this was a standalone cluster. Nevertheless, thanks for the help!

vaibhavsingh007 commented 4 years ago

@leewyang I was able to run the example as is however, I am now facing Spark issues when using more than 'SPARK_WORKER_INSTANCES=3'. I tried 8, 12, 16 (max cores on my machine are 16), but none of those seem to work. Getting errors like:

image

Spark UI

image

Master UI

image

Can you please help?

leewyang commented 4 years ago

Can you post your spark-submit command line? Also, please search for the earliest occuring error in your logs, since the last one you see is usually just a downstream symptom of the root cause.

Also note that the MNIST dataset is a very small toy example so it is easy to understand the mechanics of modifying your code for distribution. That said, the example only creates a total of 10 partitions, so using 16 executors will be overkill here.

vaibhavsingh007 commented 4 years ago

I understand, but it fails for even 8 workers. My purpose is to instrument how TFoS compares with just TF. So I am trying to increase the epochs to 5, 10, 50 to check if TFoS really contributes to a performance gain as opposed to plain TF on CPU.

Here are all my commands:

export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=8
export CORES_PER_WORKER=1
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES}))
export TFoS_HOME=$HOME/TensorFlowOnSpark

${SPARK_HOME}/sbin/start-master.sh --webui-port 5000; ${SPARK_HOME}/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G ${MASTER}

${SPARK_HOME}/bin/spark-submit --master ${MASTER} --conf spark.ui.port=8050  --conf spark.cores.max=${TOTAL_CORES} --conf spark.task.cpus=${CORES_PER_WORKER} ${TFoS_HOME}/examples/mnist/keras/mnist_spark.py --cluster_size ${SPARK_WORKER_INSTANCES} --images_labels ${TFoS_HOME}/data/mnist/csv/train --model_dir ${TFoS_HOME}/mnist_model --export_dir ${TFoS_HOME}/mnist_export --epochs 5

Here's the very first error:

image

Does the example work for you with >=8 workers?

vaibhavsingh007 commented 4 years ago

@leewyang Have verified it works fine up to 6 workers. I'm using 50 epochs (but that shouldn't matter right?). How can we scale it further?

leewyang commented 4 years ago

@vaibhavsingh007 I just verified that it works in my env at 8 workers on my 16GB laptop. Note that your logs above imply that you're running out of memory. You can try to tweak the Spark memory configurations (to reduce RDD storage) if you want.

However, before you go too much further trying to compare distributed MNIST against single-node MNIST, please see this FAQ. Basically, MNIST is a horrible example for performance testing the benefits of distributed training (since it's so small). Instead you should find a model that is either limited by compute or memory on a single-box or has a very large dataset, and therefore takes a long time to train.

vaibhavsingh007 commented 4 years ago

Hi @leewyang Hope you are doing well. Know it's been a while but I have stumbled on a new error when trying to test this example on an actual multi-pod Spark cluster (using Kubernetes) via Jupyter.

Running the following command

cd ${TFoS_HOME}
${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--conf spark.driver.host=10.129.225.18 \
--conf spark.blockManager.port=44444 \
--conf spark.driver.port=55555 \
--conf job.local.dir=${TFoS_HOME} \
--jars ${TFoS_HOME}/tensorflow-hadoop-1.0-SNAPSHOT.jar \
${TFoS_HOME}/mnist_data_setup.py \
--output ${TFoS_HOME}/data/mnist

throws

20/06/10 18:07:44 ERROR SparkHadoopWriter: Aborting job job_20200610180741_0004. org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 0.0 failed 4 times, most recent failure: Lost task 6.3 in stage 0.0 (TID 28, 10.129.222.45, executor 2): java.io.IOException: Mkdirs failed to create file:/home/vsing20/TF2/data/mnist/csv/train/_temporary/0/_temporary/attempt_20200610180741_0004_m_000006_3 (exists=false, cwd=file:/opt/spark-2.4.4-bin-hadoop2.7/work/app-20200610180731-0013/2) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123) at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:230) at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:120) at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83) at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)

Have you seen this before?

leewyang commented 4 years ago

@vaibhavsingh007 Do you have HDFS setup in your K8s cluster? If not, you'll need some kind of distributed file system that the spark executors can write to, i.e. try a simple spark job that just writes an RDD or DataFrame to disk in your environment.

vaibhavsingh007 commented 4 years ago

@leewyang so the spark executors in our env can only write to S3. I tested it successfully writing csv and parquet however, the following line of code from mnist_data

train_rdd.map(to_tfr).saveAsNewAPIHadoopFile(args.output + "/tfr/train",
                                               "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
                                               keyClass="org.apache.hadoop.io.BytesWritable",
                                               valueClass="org.apache.hadoop.io.NullWritable")

throws:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.nio.file.AccessDeniedException: s3a://../home/vsing20/dev/tfr/train: getFileStatus on s3a://../home/vsing20/dev/tfr/train: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 4DFCD55D153C922D; S3 Extended Request ID: ..), S3 Extended Request ID: ..:403 Forbidden
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2184)

Is it okay to change the output to parquet instead?

leewyang commented 4 years ago

I'm not sure if that is your problem... it seems to be security/access related, not file format related. Anyhow, you should be able to change the output to parquet, after all, it's just another format supported by Spark. Only other thing I'd try is to fully specify the S3 bucket (unless you've edited this path: s3a://../home/vsing20/dev/tfr/train).

vaibhavsingh007 commented 4 years ago

Yes, I took out env specific part from URI. Like I said, CSV and Parquet write just fine so not sure why this specifically is failing. Just one other thing, how will the following mnist read code have to be modified when I change data to parquet? Also, I may have to change RDDs to DFs before writing to S3 in that case.

tf_feed = TFNode.DataFeed(ctx.mgr, False)

  def rdd_generator():
    while not tf_feed.should_stop():
      batch = tf_feed.next_batch(1)
      if len(batch) > 0:
        example = batch[0]
        image = np.array(example[0]).astype(np.float32) / 255.0
        image = np.reshape(image, (28, 28, 1))
        label = np.array(example[1]).astype(np.float32)
        label = np.reshape(label, (1,))
        yield (image, label)
      else:
        return

  ds = tf.data.Dataset.from_generator(rdd_generator, (tf.float32, tf.float32), (tf.TensorShape([28, 28, 1]), tf.TensorShape([1])))
leewyang commented 4 years ago

That shouldn't need any changing, since reading Parquet w/ Spark will return a DF, you'll just need to convert it to an RDD before feeding. And similarly, any output RDDs from TFoS will need to be converted to DF before writing.

vaibhavsingh007 commented 4 years ago

I'm a bit confused about the usage of "/tfr/train". I see the mnist_spark.py only uses data from args.images_labels. Am I missing something or /tfr/train is actually superfluous?

Also, the mnist_spark execution keeps waiting for reservations. I tested the cluster using PI example and it works fine. Here's the cmd. Can you please take a look?

Env:

export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=4
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES}))
export TFoS_HOME=$HOME/TF2
export SPARK_HOME=/opt/spark
export SPARK_DIST_CLASSPATH=code/hadoop-3.1.2/etc/hadoop/*:/code/hadoop-3.1.2/share/hadoop/common/lib/*:/code/hadoop-3.1.2/share/hadoop/common/*:/code/hadoop-3.1.2/share/hadoop/hdfs/*:/code/hadoop-3.1.2/share/hadoop/hdfs/lib/*:/code/hadoop-3.1.2/share/hadoop/hdfs/*:/code/hadoop-3.1.2/share/hadoop/yarn/lib/*:/code/hadoop-3.1.2/share/hadoop/yarn/*:/code/hadoop-3.1.2/share/hadoop/mapreduce/lib/*:/code/hadoop-3.1.2/share/hadoop/mapreduce/*:/code/hadoop-3.1.2/share/hadoop/tools/lib/*

CMD:

spark-submit --conf spark.kubernetes.container.image=my-image-uri --conf spark.driver.host=my-k8-host --conf spark.cores.max=${TOTAL_CORES} --conf spark.task.cpus=${CORES_PER_WORKER} ${TFoS_HOME}/mnist_spark.py --cluster_size ${SPARK_WORKER_INSTANCES} --model_dir ${S3_BASE}/mnist_model --export_dir ${S3_BASE}/mnist_export

o/p image

Working PI eg cmd:

spark-submit --class org.apache.spark.examples.SparkPi --conf spark.driver.host=my-k8-host --conf spark.kubernetes.container.image=my-image-uri /opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar 10

PI o/p:

image

spark-defaults Conf file:

sh-4.2$ cat /opt/spark/conf/spark-defaults.conf
spark.master k8s://kubernetes.default.svc.cluster.local
spark.kubernetes.namespace my-jpr
#spark.driver.host "$deployment".spark-service.svc.cluster.local
spark.driver.port 8003
spark.blockManager.port 38000
spark.broadcast.port 38001
spark.executor.port 38002
spark.driver.memory 16g
spark.driver.maxResultSize 0
spark.executor.memory 24g
spark.kubernetes.executor.request.cores 1
spark.kubernetes.executor.limit.cores 32
spark.kubernetes.node.selector.workload my-jpr
spark.kubernetes.container.image  <my-image-uri>
sh-4.2$ 
leewyang commented 4 years ago

The tfr/train was just the subfolders generated by the mnist_data_setup. If you're testing, you can just replace this with your arg and fully-specify that path in your arg.

In your Spark Pi job, do you see additional K8s nodes being used as executors? Or is it just running locally on a single K8s node? i.e. can you make the spark-submit command line look more like the TFoS one (or vice versa)?

vaibhavsingh007 commented 4 years ago

I mean I don't see the tfr/ content used anywhere. Yes, the PI job uses multiple K8 nodes per the K8 Instana UI. Now i'm able to run the mnist example with SPARK_WORKER_INSTANCES=2. Anything more than that is not getting allocated - waiting for x reservations. Investigating..

vaibhavsingh007 commented 4 years ago

Figured. Spark.executor.instances had to be passed explicitly to K8 spark cluster. TFoS couldn't spawn them on its own.

vaibhavsingh007 commented 4 years ago

Now back to the S3 write. How do I modify mnist_spark.py to use S3 instead? I see both tf and tfos apis being used to read/write model/checkpoints, viz.

tf.io.gfile.makedirs(args.model_dir)
filepath = args.model_dir + "/weights-{epoch:04d}"
callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath=filepath, verbose=1, save_weights_only=True)]

&

export_dir = export_lib.get_timestamped_export_dir(args.export_dir)
print('----- Exporting model -----')
compat.export_saved_model(multi_worker_model, export_dir, ctx.job_name == 'chief')

Getting this error:

---------------------------------------------------------------------------
UnimplementedError                        Traceback (most recent call last)
<ipython-input-9-4717564621a2> in <module>
----> 1 tf.io.gfile.makedirs(s3_base_path+'/mnist_model')

~/.local/lib/python3.6/site-packages/tensorflow_core/python/lib/io/file_io.py in recursive_create_dir_v2(path)
    453     errors.OpError: If the operation fails.
    454   """
--> 455   pywrap_tensorflow.RecursivelyCreateDir(compat.as_bytes(path))
    456 
    457 

UnimplementedError: File system scheme 's3a' not implemented (file: 's3a://<..>/home/vsing20/dev/mnist_model')

I can write csvs and parquet using Spark to S3 but had to configure several auth options using:

spark._jsc.hadoopConfiguration().set(...)

leewyang commented 4 years ago

Haven't tried s3 myself, but did find this reference which might be useful.

vaibhavsingh007 commented 4 years ago

Thanks for sharing, but couldn't get it working: https://github.com/tensorflow/tensorflow/issues/40302#issuecomment-658361847

Investigating..