Closed jiadexin closed 8 years ago
As the error message shows, it may be because the TensorFlow session in the master is not properly broadcasted to the Spark executor. The cause, which sometimes happened during the implementation, is likely that in the method save_session_hdfs() in core/spark_session.py, the local session serialized files are deleted before they are put into HDFS. Though the statement to delete the local session files is placed after the hdfs.put() call, the hdfs.put() call seems asynchronous. To temporarily fix this problem, save_session_hdfs() in the latest commit does not delete the local session file immediately, and the user can delete these files manually when they want to. A better solution for this problem is expected to come out soon, hopely in the next major commit.
I hope this answer could solve your problem. In the case that it fails again, could you please provide us more information on the setting of your environment, e.g., versions of os and Spark, configuration values, etc.
Thanks for your support for the project.
thank you for reply. The error still exists. my environment:
spark 2.0
tensorflow (0.10.0)
spark run on standalone cluster,only one machine.
and shell run on the same machine:
pyspark --master spark://localhost:7077 --deploy-mode client
1.when the parameternum_epoch=1
,no problems.
but the accuracy result is '0.098', yet the minist.py
execute result is '0.4075' (same epoch )
2.when num_epoch>1, there have another problem:
Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner self.run() File "/opt/workspace/tensoronspark/tensorspark/core/param_server.py", line 33, in run IOLoop.current().start() File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 752, in start raise RuntimeError("IOLoop is already running") RuntimeError: IOLoop is already running
The cause of the problem should be Parameter Server.stop method:
if self._http_server is not None: self._http_server.stop() self._http_server = None
and i try add the following codes to solve this problem:
IOLoop.current().stop()
The second problem "epoch>1" is not the issue of epoch>1 or not, it is because the parameter server is not proper shutdown after the first execution. If you try epoch>1 first before epoch=1, then the same problem may happen to epoch=1. Thanks for pointing out the IOLoop.current.stop() call to terminate the tornado ioloop. In occasional cases when the parameter server of the previous run is not stopped, one may bother to manually find out the server process by searching the process occupying the server port ( 10080 by default) and terminate it using the terminal.
As to the first problem, both the spark_mnist.py and mnist.py should generate the nearly same result, about 91%, especially in the case of only one spark executor in the standalone mode, where the input data are processed sequentially exactly like the single machine algorithm. Here is an output of the spark_mnist result in my testbed:
Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz
0.9158
<tensorflow.python.client.session.Session object at 0x11753c450>
But in your case, not only the spark_mnist.py achieve a very low accuracy, but the accuracy of mnist.py is only half of the expected value. A possible reason for the low accuracy is that the corruption of train data or test data. When using spark_mnist.py, the files in HDFS should be like this when you "ls" them:
server:workspace liangfengsid$ hadoop fs -ls /data/mnist
Found 4 items
-rw-r--r-- 1 liangfengsid supergroup 7840016 2016-07-05 17:15 /data/mnist/t10k-images-idx1-ubyte
-rw-r--r-- 1 liangfengsid supergroup 10008 2016-07-05 17:15 /data/mnist/t10k-labels-idx1-ubyte
-rw-r--r-- 1 liangfengsid supergroup 47040016 2016-07-05 17:15 /data/mnist/train-images-idx1-ubyte
-rw-r--r-- 1 liangfengsid supergroup 60008 2016-07-05 17:15 /data/mnist/train-labels-idx1-ubyte
If the problem still exists or any new problem arises, please let me know.
Now all ok. The cause of the problem comes from the training data extraction method:
def _extract_single_image_file(iterator):
.....
fileindex = filename.split('-')[2]
def extract_labels(sc, filepath, num_class, one_hot=False):
.........
fileindex = filename.split('-')[2]
my hdfs files :
root@hawq-dev:/tmp/spark# hadoop fs -ls /data/mnist
Found 4 items
-rw-r--r-- 1 root supergroup 7840016 2016-09-23 11:58 /data/mnist/t10k-images-idx1-ubyte
-rw-r--r-- 1 root supergroup 10008 2016-09-23 11:58 /data/mnist/t10k-labels-idx1-ubyte
-rw-r--r-- 1 root supergroup 47040016 2016-09-23 11:58 /data/mnist/train-images-idx1-ubyte
-rw-r--r-- 1 root supergroup 60008 2016-09-23 11:58 /data/mnist/train-labels-idx1-ubyte
the _extract_single_image_file
execute result: fileindex='images'
, rather thanfileindex='idx1'
.
so i modified the method as following:
fileindex = filename.split('-')[3]
.
now ,when num_epoch=2 , the spark_mnist result :
Extracting /notebooks/MNIST_data/train-images-idx3-ubyte.gz
Extracting /notebooks/MNIST_data/train-labels-idx1-ubyte.gz
Extracting /notebooks/MNIST_data/t10k-images-idx3-ubyte.gz
Extracting /notebooks/MNIST_data/t10k-labels-idx1-ubyte.gz
test session [140148325176784]
0.9229
In addition, I also modified the way to start the ParameterServer. I add two methods to SparkSession.py :
startParameterServer()
stopParameterServer()
the codes of startParameterServer()
mostly copied from the original run
method, and run
method code modified as:
def run(self, fetches, feed_rdd=None, feed_dict=None, options=None, run_metadata=None):
if feed_rdd is None:
if self._is_session_updated():
return self._session.run(fetches, feed_dict, options, run_metadata)
param_bc = self._param_bc
def _spark_run_fn(splitIndex, partition):
worker = SessionWorker(index=splitIndex, param_bc=param_bc)
# params = param_bc.value
# _run_fn(splitIndex, partition, params)
worker.run(splitIndex=splitIndex, partition=partition)
worker.close()
return [1]
feed_rdd.mapPartitionsWithIndex(_spark_run_fn).count()
ParameterServer.py on_message(self, msg) method modified as:
elif op == 'end':
self.server._ended_worker.add(worker_id)
if len(self.server._ended_worker) == self.server._num_worker:
**#self.server.stop() #removed**
self.server._ended_worker = sets.Set()
and spark_mnist.py mainly code:
spark_sess = sps.SparkSession(sc, sess, user=user, name=name, server_host=server_host, server_port=server_port, sync_interval=sync_interval, batch_size=batch_size)
partitioner = par.RandomPartitioner(num_partition)
combiner = comb.DeltaWeightCombiner()
spark_sess.startParameterServer(train_step,num_worker=num_partition, feed_name_list=feed_name_list, param_list=param_list, weight_combiner=combiner, shuffle_within_partition=True)
for i in range(num_epoch):
spark_sess.run(train_step, feed_rdd=image_label_rdd)
if i != num_epoch-1:
temp_image_label_rdd = image_label_rdd.partitionBy(num_partition, partitioner).cache()
image_label_rdd.unpersist()
image_label_rdd = temp_image_label_rdd
spark_sess.stopParameterServer()
can solve the problem of RuntimeError: IOLoop is already running
.
you see how this?
The reason for the problem finally found , is still a problem in the _extract_single_image_file () method.
my hostname = hawq-dev
, so the filename=hdfs://hawq-dev:8020/data/mnist/t10k-images-idx1-ubyte, fileindex=idx1 , data len=7840016,num_images=10000
, therefore filename.split('-')[2] = images
rather than filename.split('-')[2] = idx1
.
Thanks for pointing out this. I think 'filename.split('-')[-2]' will do. We will handle that.
As to the startParameterServer()
or stopParameterServer()
issue, we consider a simple programming interface that the Tensorflow users need be in less control of the parameter server. But still, you point out the potential problem of the server starts and stops everytime the SparkSession
runs a training. A reusable parameter server that only starts and stops once for the training of different epochs. We are working on a new update to this.
Instead of requiring the users to start or stop the parameter server explicitly, the new commit supports the reusable parameter server that the web server need not start and stop itself between different traing epochs. The user need only indicate with the boolean argument server_reusable
(False by default) in the SparkSession.run()
. If the user wants to make use of the reusable parameter server and turn the option on, he is required to manually stop the server by calling SparkSession.stop_param_server
when he no longer need reuse the parameter server, e.g., when he finish all the training epochs.
If the problem is solved for you, may I close this issue? A new issue can be proposed if a new problem occurs.
i created a project tensorflow-spark-docker ,make it easy to run tensorflow on spark via docker.
That's great! I read your project and find it interesting. I think I may ask one of my colleagues to try it out soon.
when i run the MNIST example ,encountered an error:
how to solve? thanks.