Closed Wangchanghao12 closed 4 years ago
I don't see anything obvious in your logs. That said, those errors may be downstream of the actual root cause. Please look for the earliest error across all of your executors.
Also, does this also occur with the mnist example (it looks like you're running a TestTFOnSpark.py
file)?
Thank you for your reply, I haven't run the mnist example, I modified the code according to the mnist_spark example.
ERROR Executor is:
2020-08-10 19:00:24.596094: E tensorflow/core/common_runtime/ring_alg.cc:274] Aborting RingReduce with Cancelled: RPC Request was cancelled
2020-08-10 19:00:24.596204: W tensorflow/core/framework/op_kernel.cc:1767] OP_REQUIRES failed at collective_ops.cc:257 : Cancelled: RPC Request was cancelled
20/08/10 19:00:52 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 26)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 253, in main
process()
File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func
File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func
File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func
[Previous line repeated 1 more time]
File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 350, in func
File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 799, in func
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 630, in _shutdown
raise Exception("Exception in worker:\n" + e_str)
Exception: Exception in worker:
Traceback (most recent call last):
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 407, in wrapper_fn_background
wrapper_fn(args, context)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 401, in wrapper_fn
fn(args, context)
File "./recom.zip/recom/pyspark/etl/adb/TestTFOnSpark.py", line 76, in train
multi_worker_model.save(export_dir, save_format='tf')
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1979, in save
signatures, options)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/saving/save.py", line 134, in save_model
signatures, options)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/saving/saved_model/save.py", line 80, in save
save_lib.save(model, filepath, signatures, options)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/saved_model/save.py", line 985, in save
options=ckpt_options)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1200, in save
file_prefix_tensor, object_graph_tensor, options)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1145, in _save_cached_when_graph_building
save_op = saver.save(file_prefix, options=options)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 295, in save
return save_fn()
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 269, in save_fn
sharded_saves.append(saver.save(shard_prefix, options))
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 74, in save
tensors.append(spec.tensor)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/saveable_object.py", line 55, in tensor
return self._tensor() if callable(self._tensor) else self._tensor
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/values.py", line 968, in tensor
return strategy.extended.read_var(sync_on_read_variable)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 674, in read_var
return replica_local_var._get_cross_replica() # pylint: disable=protected-access
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/values.py", line 1071, in _get_cross_replica
axis=None)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 1262, in reduce
return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2166, in _reduce
return self._local_results(self.reduce_to(reduce_op, value, dst))[0]
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2194, in reduce_to
return self._reduce_to(reduce_op, value, destinations, experimental_hints)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 590, in _reduce_to
experimental_hints=experimental_hints)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 270, in reduce
destinations, experimental_hints)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 984, in reduce_implementation
experimental_hints)[0]
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 1037, in _batch_all_reduce
experimental_hints)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 1111, in _do_batch_all_reduce_dense
executors=self._executors))
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_utils.py", line 386, in build_collective_reduce
subdiv_offsets, communication_hint)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/collective_ops.py", line 71, in all_reduce
timeout_seconds=timeout)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py", line 362, in collective_reduce
timeout_seconds=timeout_seconds, name=name, ctx=_ctx)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py", line 440, in collective_reduce_eager_fallback
_attr_T, (input,) = _execute.args_to_matching_eager([input], ctx)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 263, in args_to_matching_eager
t, dtype, preferred_dtype=default_dtype, ctx=ctx))
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 1499, in convert_to_tensor
ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 1909, in _dense_var_to_tensor
return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref) # pylint: disable=protected-access
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 1326, in _dense_var_to_tensor
return self.value()
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 555, in value
return self._read_variable_op()
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 657, in _read_variable_op
result = read_and_set_handle()
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 648, in read_and_set_handle
self._dtype)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_resource_variable_ops.py", line 480, in read_variable_op
_ops.raise_from_not_ok_status(e, name)
File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status
six.raise_from(core._status_to_exception(e.code, message), None)
File "
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2079)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2079)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I still don't see anything obvious in your logs, so here's a couple things to try:
Thank you. I can run theMNIST model successfully. I will see if there is a problem with my code.
Closing due to inactivity. Feed free to re-open if still an issue.
Environment:
Describe the bug: A single execute can successfully save the final model to HDFS. But when more than one executes are used, weights are still saved to HDFS normally. The model only saves an empty folder containing the variables empty folder.
Logs: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 33, yidui-ydstats-15, exe cutor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 253, in main process() File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 248, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func [Previous line repeated 1 more time] File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 350, in func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 799, in func File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 630, in _shutdown raise Exception("Exception in worker:\n" + e_str) Exception: Exception in worker: Traceback (most recent call last): File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 407, in wrapper_fn_background wrapper_fn(args, context) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 401, in wrapper_fn fn(args, context) File "./recom.zip/recom/pyspark/etl/adb/TestTFOnSpark.py", line 76, in train multi_worker_model.save(export_dir, save_format='tf') File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1979, in save signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/saving/save.py", line 134, in save_model signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/saving/saved_model/save.py", line 80, in save save_lib.save(model, filepath, signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/saved_model/save.py", line 985, in save options=ckpt_options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1200, in save file_prefix_tensor, object_graph_tensor, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1145, in _save_cached_when_graph_building save_op = saver.save(file_prefix, options=options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 295, in save return save_fn() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 269, in save_fn File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 269, in save_fn sharded_saves.append(saver.save(shard_prefix, options)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 74, in save tensors.append(spec.tensor) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/saveable_object.py", line 55, in tensor return self._tensor() if callable(self._tensor) else self._tensor File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/values.py", line 968, in tensor return strategy.extended.read_var(sync_on_read_variable) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 674, in read_var return replica_local_var._get_cross_replica() # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/values.py", line 1071, in _get_cross_replica axis=None) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 1262, in reduce return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2166, in _reduce return self._local_results(self.reduce_to(reduce_op, value, dst))[0] File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2194, in reduce_to return self._reduce_to(reduce_op, value, destinations, experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 590, in _reduce_to experimental_hints=experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 270, in reduce destinations, experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 984, in reduce_implementation experimental_hints)[0] File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 1037, in _batch_all_reduce experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 1111, in _do_batch_all_reduce_dense executors=self._executors)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_utils.py", line 386, in build_collective_reduce subdiv_offsets, communication_hint) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/collective_ops.py", line 71, in all_reduce timeout_seconds=timeout) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py", line 362, in collective_reduce timeout_seconds=timeout_seconds, name=name, ctx=_ctx) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py", line 440, in collective_reduce_eager_fallback _attr_T, (input,) = _execute.args_to_matching_eager([input], ctx) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 263, in args_to_matching_eager t, dtype, preferred_dtype=default_dtype, ctx=ctx)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 1499, in convert_to_tensor ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 1909, in _dense_var_to_tensor return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref) # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 1326, in _dense_var_to_tensor return self.value() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 555, in value return self._read_variable_op() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 657, in _read_variable_op result = read_and_set_handle() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 648, in read_and_set_handle self._dtype) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_resource_variable_ops.py", line 480, in read_variable_op _ops.raise_from_not_ok_status(e, name) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status six.raise_from(core._status_to_exception(e.code, message), None) File "", line 3, in raise_from
tensorflow.python.framework.errors_impl.CancelledError: RPC Request was cancelled
Encountered when executing an operation using EagerExecutor. This error cancels all future operations and poisons their output tensors. [Op:ReadVariableOp]
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2039) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2060) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2079) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2104) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 253, in main process() File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 248, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func [Previous line repeated 1 more time] File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 350, in func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 799, in func File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 630, in _shutdown raise Exception("Exception in worker:\n" + e_str) Exception: Exception in worker: Traceback (most recent call last): File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 407, in wrapper_fn_background wrapper_fn(args, context) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 401, in wrapper_fn fn(args, context) File "./recom.zip/recom/pyspark/etl/adb/TestTFOnSpark.py", line 76, in train multi_worker_model.save(export_dir, save_format='tf') File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1979, in save signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/saving/save.py", line 134, in save_model signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/saving/saved_model/save.py", line 80, in save save_lib.save(model, filepath, signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/saved_model/save.py", line 985, in save options=ckpt_options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1200, in save file_prefix_tensor, object_graph_tensor, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1145, in _save_cached_when_graph_building save_op = saver.save(file_prefix, options=options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 295, in save return save_fn() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 269, in save_fn sharded_saves.append(saver.save(shard_prefix, options)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 74, in save tensors.append(spec.tensor) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/saveable_object.py", line 55, in tensor return self._tensor() if callable(self._tensor) else self._tensor File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/values.py", line 968, in tensor return strategy.extended.read_var(sync_on_read_variable) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 674, in read_var return replica_local_var._get_cross_replica() # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/values.py", line 1071, in _get_cross_replica axis=None) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 1262, in reduce return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2166, in _reduce return self._local_results(self.reduce_to(reduce_op, value, dst))[0] File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2194, in reduce_to return self._reduce_to(reduce_op, value, destinations, experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 590, in _reduce_to experimental_hints=experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 270, in reduce destinations, experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 984, in reduce_implementation experimental_hints)[0] File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 1037, in _batch_all_reduce experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 1111, in _do_batch_all_reduce_dense executors=self._executors)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_utils.py", line 386, in build_collective_reduce subdiv_offsets, communication_hint) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/collective_ops.py", line 71, in all_reduce timeout_seconds=timeout) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py", line 362, in collective_reduce timeout_seconds=timeout_seconds, name=name, ctx=_ctx) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py", line 440, in collective_reduce_eager_fallback _attr_T, (input,) = _execute.args_to_matching_eager([input], ctx) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 263, in args_to_matching_eager t, dtype, preferred_dtype=default_dtype, ctx=ctx)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 1499, in convert_to_tensor ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 1909, in _dense_var_to_tensor return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref) # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 1326, in _dense_var_to_tensor return self.value() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 555, in value return self._read_variable_op() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 657, in _read_variable_op result = read_and_set_handle() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 648, in read_and_set_handle self._dtype) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_resource_variable_ops.py", line 480, in read_variable_op _ops.raise_from_not_ok_status(e, name) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status six.raise_from(core._status_to_exception(e.code, message), None) File "", line 3, in raise_from
tensorflow.python.framework.errors_impl.CancelledError: RPC Request was cancelled
Encountered when executing an operation using EagerExecutor. This error cancels all future operations and poisons their output tensors. [Op:ReadVariableOp]
109 traceback.format_exc(): Traceback (most recent call last): File "/opt/app/ulog/pyspark/recom/pyspark/job/base.py", line 99, in jobCore result = self.sparkCore(spark, date, repartition, args) File "/opt/app/ulog/pyspark/recom/pyspark/etl/adb/TestTFOnSpark.py", line 136, in sparkCore cluster.shutdown(grace_secs=30) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFCluster.py", line 176, in shutdown workerRDD.foreachPartition(TFSparkNode.shutdown(self.cluster_info, gracesecs, self.queues)) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 804, in foreachPartition self.mapPartitions(func).count() # Force evaluation File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1053, in count return self.mapPartitions(lambda i: [sum(1 for in i)]).sum() File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1044, in sum return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 915, in fold vals = self.mapPartitions(func).collect() File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 814, in collect sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 33, yidui-ydstats-15, exe cutor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 253, in main process() File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 248, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func [Previous line repeated 1 more time] File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 350, in func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 799, in func File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 630, in _shutdown raise Exception("Exception in worker:\n" + e_str) Exception: Exception in worker: Traceback (most recent call last): File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 407, in wrapper_fn_background wrapper_fn(args, context) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflowonspark/TFSparkNode.py", line 401, in wrapper_fn fn(args, context) File "./recom.zip/recom/pyspark/etl/adb/TestTFOnSpark.py", line 76, in train multi_worker_model.save(export_dir, save_format='tf') File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1979, in save signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/saving/save.py", line 134, in save_model signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/keras/saving/saved_model/save.py", line 80, in save save_lib.save(model, filepath, signatures, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/saved_model/save.py", line 985, in save options=ckpt_options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1200, in save file_prefix_tensor, object_graph_tensor, options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/tracking/util.py", line 1145, in _save_cached_when_graph_building save_op = saver.save(file_prefix, options=options) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 295, in save return save_fn() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 269, in save_fn sharded_saves.append(saver.save(shard_prefix, options)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/functional_saver.py", line 74, in save tensors.append(spec.tensor) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/training/saving/saveable_object.py", line 55, in tensor return self._tensor() if callable(self._tensor) else self._tensor File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/values.py", line 968, in tensor return strategy.extended.read_var(sync_on_read_variable) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 674, in read_var return replica_local_var._get_cross_replica() # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/values.py", line 1071, in _get_cross_replica axis=None) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 1262, in reduce return self._extended._reduce(reduce_op, value) # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2166, in _reduce return self._local_results(self.reduce_to(reduce_op, value, dst))[0] File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2194, in reduce_to return self._reduce_to(reduce_op, value, destinations, experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/collective_all_reduce_strategy.py", line 590, in _reduce_to experimental_hints=experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 270, in reduce destinations, experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 984, in reduce_implementation experimental_hints)[0] File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 1037, in _batch_all_reduce experimental_hints) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_ops.py", line 1111, in _do_batch_all_reduce_dense executors=self._executors)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/distribute/cross_device_utils.py", line 386, in build_collective_reduce subdiv_offsets, communication_hint) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/collective_ops.py", line 71, in all_reduce timeout_seconds=timeout) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py", line 362, in collective_reduce timeout_seconds=timeout_seconds, name=name, ctx=_ctx) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_collective_ops.py", line 440, in collective_reduce_eager_fallback _attr_T, (input,) = _execute.args_to_matching_eager([input], ctx) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 263, in args_to_matching_eager t, dtype, preferred_dtype=default_dtype, ctx=ctx)) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 1499, in convert_to_tensor ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 1909, in _dense_var_to_tensor return var._dense_var_to_tensor(dtype=dtype, name=name, as_ref=as_ref) # pylint: disable=protected-access File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 1326, in _dense_var_to_tensor return self.value() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 555, in value return self._read_variable_op() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 657, in _read_variable_op result = read_and_set_handle() File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/resource_variable_ops.py", line 648, in read_and_set_handle self._dtype) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/gen_resource_variable_ops.py", line 480, in read_variable_op _ops.raise_from_not_ok_status(e, name) File "/opt/tools/anaconda3/lib/python3.7/site-packages/tensorflow/python/framework/ops.py", line 6843, in raise_from_not_ok_status six.raise_from(core._status_to_exception(e.code, message), None) File "", line 3, in raise_from
tensorflow.python.framework.errors_impl.CancelledError: RPC Request was cancelled
Encountered when executing an operation using EagerExecutor. This error cancels all future operations and poisons their output tensors. [Op:ReadVariableOp]
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2039) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2060) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2079) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2104) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 253, in main process() File "/usr/hdp/3.1.4.0-315/spark2/python/pyspark/worker.py", line 248, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2440, in pipeline_func [Previous line repeated 1 more time] File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 350, in func File "/usr/hdp/3.1.4.0-315/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 799, in func