cerndb / dist-keras

Distributed Deep Learning, with a focus on distributed training, using Keras and Apache Spark.
http://joerihermans.com/work/distributed-keras/
GNU General Public License v3.0
624 stars 169 forks source link

how to use directly the numpy data loaded to rdd #16

Closed kirk86 closed 7 years ago

kirk86 commented 7 years ago

Hi thanks for this work, it would be also nice to have some examples with mnist and cifar where we manipulate numpy arrays since this are actually small datasets and load them to memory, bring them in the desired format and then transform them into RDD and use them with the optimizers you've provided. Since right now the trainer method takes only kind of a dataframe structure with column labels. I know we could easily transform the data into that format but it would be nice to see an example of how to use directly the RDD.

for instance:

from keras.datasets import mint
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train(-1, 784).astype('float32') / 255.
.
.
.
here make them RDD
construct keras model
train by distributing the model
train by distributing the data

Cheers.

JoeriHermans commented 7 years ago

That's actually a good suggestion, thanks! I'll try to think of some examples. However, for large datasets this is not really possible (as you will know).

I have an example for larger datasets in another repository (https://github.com/JoeriHermans/hep-track-reconstruction-ml/blob/master/notebooks/data_preprocessing.ipynb) which basically takes raw numpy files, and converts them to a format I will use later.

For smaller (also larger) datasets, it boils down to the fact that no matter what numpy shape you have, in Spark it needs to be stored as a list because in the other case you will encounter some issues regarding types. This is especially annoying in the case of convolutionals. So before you call sc.parallelize on a list of numpy matrices / vectors, just convert every element of that list to a list using ndarray.tolist(), and everything will work as expected.

I hope this helps.

Joeri

kirk86 commented 7 years ago

For smaller (also larger) datasets, it boils down to the fact that no matter what numpy shape you have, in Spark it needs to be stored as a list because in the other case you will encounter some issues regarding types.

Thanks, that makes sense. Let me ask you a couple more questions if I may, since I just started with spark.

1.) In your mnist examples you have sth along the lines os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

this causes an error every time I create the spark context object sc = SparkContext(conf=conf)

17/04/11 17:25:25 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor).  This may indicate an error, since only one SparkContext may be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:422)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:236)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
17/04/11 17:25:25 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Invalid master URL: spark://143.239.81.131
        at org.apache.spark.util.Utils$.extractHostPortFromSparkUrl(Utils.scala:2358)
        at org.apache.spark.rpc.RpcAddress$.fromSparkURL(RpcAddress.scala:47)
        at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
        at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.deploy.client.StandaloneAppClient.<init>(StandaloneAppClient.scala:52)
        at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.start(StandaloneSchedulerBackend.scala:108)
        at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:236)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 ERROR Utils: Uncaught exception in thread Thread-2
java.lang.NullPointerException
        at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:214)
        at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116)
        at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:467)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1588)
        at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1826)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1825)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:587)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:236)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
17/04/11 17:25:25 WARN MetricsSystem: Stopping a MetricsSystem that is not running
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-70-6b825dbb354c> in <module>()
----> 1 sc = SparkContext(conf=conf)

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
    116         try:
    117             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
--> 118                           conf, jsc, profiler_cls)
    119         except:
    120             # If an error occurs, clean up in order to allow future SparkContext creation:

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)
    177
    178         # Create the Java SparkContext through Py4J
--> 179         self._jsc = jsc or self._initialize_context(self._conf._jconf)
    180         # Reset the SparkConf to the one actually used by the SparkContext in JVM.
    181         self._conf = SparkConf(_jconf=self._jsc.sc().conf())

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/pyspark/context.pyc in _initialize_context(self, jconf)
    244         Initialize SparkContext in function to allow subclass specific initialization
    245         """
--> 246         return self._jvm.JavaSparkContext(jconf)
    247
    248     @classmethod

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1399         answer = self._gateway_client.send_command(command)
   1400         return_value = get_return_value(
-> 1401             answer, self._gateway_client, None, self._fqn)
   1402
   1403         for temp_arg in temp_args:

/miniconda2/envs/dist_keras/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Invalid master URL: spark://143.239.81.131
        at org.apache.spark.util.Utils$.extractHostPortFromSparkUrl(Utils.scala:2358)
        at org.apache.spark.rpc.RpcAddress$.fromSparkURL(RpcAddress.scala:47)
        at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
        at org.apache.spark.deploy.client.StandaloneAppClient$$anonfun$1.apply(StandaloneAppClient.scala:52)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.deploy.client.StandaloneAppClient.<init>(StandaloneAppClient.scala:52)
        at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.start(StandaloneSchedulerBackend.scala:108)
        at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:156)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:236)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)

I did some googling and it terns out I need to set up the master os.environ['PYSPARK_SUBMIT_ARGS']='--master local[45] pyspark-shell' but it only work if master = "local[*]" and not if u try master = "yarn-client"

2.) Let's assume a hypothetical scenario of a cluster with 5 nodes sharing the same storage. node1 is the the entry node and node2,...,node5 are working nodes

What is the proper way to set the node1 as the master and all the rest as working nodes for the computations to take place upon?

2.a) If possible provide an example where we parallelize the model across nodes node2,...,node5 2.b) also how do we achieve the same thing but this time distribute the data across the same nodes and not the model

Thanks!

JoeriHermans commented 7 years ago

Hi,

Sorry for the late answer, I prepared an example how to deal with your initial problem of reading the numpy files into an RDD / DataFrame.

https://github.com/cerndb/dist-keras/blob/master/examples/distributed_numpy_parsing.ipynb

Yes, if you change the notebook flag to local=True, then it would have done it automatically :)

rayjang commented 7 years ago

Can I ask one more related to this issue?

(X_train, y_train), (X_test, y_test) = mnist.load_data()

In above case, X ndarray and Y ndarray are separated. but they are combined in your MNIST example code, which means that features and labels are in one same ndarray. My data is also separated like x(features) and y(label). (my x and y ndarrays have the same style with the return value of 'mnist.load_data()' In this case, how can I transform x_train and y_train to be the right style for your dist-keras code?