maxpumperla / elephas

Distributed Deep learning with Keras & Spark
http://maxpumperla.com/elephas/
MIT License
1.57k stars 312 forks source link

Example with pre-trained model #53

Closed mrelich closed 7 years ago

mrelich commented 7 years ago

Apologies if this is hidden somewhere in the examples, but I am trying to simply use a pre-trained model in spark. From what I have read, it looks like I should be able to implement a transform object from a keras model like this:

transformer = ElephasTransformer(labelCol  = 'features',                                                                                                                  
                                 outputCol = 'preds',                                                                                                                     
                                 keras_model_config=model.get_config(),                                                                                                   
                                 weights = model.get_weights()) 

Then I should be able to call: res = transformer._transform(df). I realize this isn't by design, so I'm simply hacking this together... Does this seem plausible / on the right path?

My input data are length 50 feature vectors that I insert into the dataframe further upstream as a list. The error I get might be related to the datatype choice:

17/07/24 23:28:31 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/mattr/spark_test2/spark_env/lib/python2.7/site-packages/elephas/ml_model.py", line 106, in <lambda>
    features = np.asarray(rdd.map(lambda x: from_vector(x.features)).collect())
  File "/home/mattr/spark_test2/spark_env/lib/python2.7/site-packages/elephas/mllib/adapter.py", line 23, in from_vector
    return vector.array
AttributeError: 'list' object has no attribute 'array'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Any help would be much appreciated!

mrelich commented 7 years ago

I was making this much more complicated than necessary. The main issue was data types, which don't really require the full infrastructure of this package. Sorry for the noise!