databricks / spark-deep-learning

Deep Learning Pipelines for Apache Spark
https://databricks.github.io/spark-deep-learning
Apache License 2.0
1.99k stars 494 forks source link

Save a pipelined model #142

Closed noiseux1523 closed 6 years ago

noiseux1523 commented 6 years ago

I followed the deep learning pipelines tutorial and my question concerns the Transfer Learning section. Everything works fine and now I want to save the trained pipelined model. The following code is working fine.

# Use a featurizer to use trained features from an existing model
featurizer = DeepImageFeaturizer(inputCol = "image", outputCol = "features", modelName = "InceptionV3")

# Logistic regression to make a classification
lr = LogisticRegression(maxIter = 20, regParam = 0.05, elasticNetParam = 0.3, labelCol = "label")

# Pipeline both entities
p = Pipeline(stages=[featurizer, lr])

# Fit it to our training set
p_model = p.fit(train_df)

It also works fine when I evaluate.

tested_df = p_model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(tested_df.select("prediction", "label"))))

What I try to do now is save this pipelined model and reload it to test some new data. I am doing it simply like this.

p_model.save('./test')
test = PipelineModel.load('./test')

But, I get this error.

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-69-d35bfc1ba232> in <module>()
----> 1 test = PipelineModel.load('./test')

~/miniconda3/envs/DL-Pipelines/lib/python3.6/site-packages/pyspark/ml/util.py in load(cls, path)
    309     def load(cls, path):
    310         """Reads an ML instance from the input path, a shortcut of `read().load(path)`."""
--> 311         return cls.read().load(path)
    312 
    313 

~/miniconda3/envs/DL-Pipelines/lib/python3.6/site-packages/pyspark/ml/pipeline.py in load(self, path)
    240         metadata = DefaultParamsReader.loadMetadata(path, self.sc)
    241         if 'language' not in metadata['paramMap'] or metadata['paramMap']['language'] != 'Python':
--> 242             return JavaMLReader(self.cls).load(path)
    243         else:
    244             uid, stages = PipelineSharedReadWrite.load(metadata, self.sc, path)

~/miniconda3/envs/DL-Pipelines/lib/python3.6/site-packages/pyspark/ml/util.py in load(self, path)
    251             raise NotImplementedError("This Java ML type cannot be loaded into Python currently: %r"
    252                                       % self._clazz)
--> 253         return self._clazz._from_java(java_obj)
    254 
    255     def context(self, sqlContext):

~/miniconda3/envs/DL-Pipelines/lib/python3.6/site-packages/pyspark/ml/pipeline.py in _from_java(cls, java_stage)
    297         """
    298         # Load information from java_stage to the instance.
--> 299         py_stages = [JavaParams._from_java(s) for s in java_stage.stages()]
    300         # Create a new instance of this stage.
    301         py_stage = cls(py_stages)

~/miniconda3/envs/DL-Pipelines/lib/python3.6/site-packages/pyspark/ml/pipeline.py in <listcomp>(.0)
    297         """
    298         # Load information from java_stage to the instance.
--> 299         py_stages = [JavaParams._from_java(s) for s in java_stage.stages()]
    300         # Create a new instance of this stage.
    301         py_stage = cls(py_stages)

~/miniconda3/envs/DL-Pipelines/lib/python3.6/site-packages/pyspark/ml/wrapper.py in _from_java(java_stage)
    218         stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark")
    219         # Generate a default new instance from the stage_name class.
--> 220         py_type = __get_class(stage_name)
    221         if issubclass(py_type, JavaParams):
    222             # Load information from java_stage to the instance.

~/miniconda3/envs/DL-Pipelines/lib/python3.6/site-packages/pyspark/ml/wrapper.py in __get_class(clazz)
    214             m = __import__(module)
    215             for comp in parts[1:]:
--> 216                 m = getattr(m, comp)
    217             return m
    218         stage_name = java_stage.getClass().getName().replace("org.apache.spark", "pyspark")

AttributeError: module 'com.databricks.sparkdl' has no attribute 'DeepImageFeaturizer'

Why do I get this problem when reloading the model when it is working fine at first? I am still a beginner in Spark, sorry if this may be obvious.

Thank you

dimagoldin commented 6 years ago

any news on this? i am having the same problem

noiseux1523 commented 6 years ago

I found a workaround. I only save the last layer (logistic regression) and will rebuild the pipeline afterwards.

Save the last layer

p_model.stages[1].write().overwrite().save('lr')

Reload the model

lr_test = LogisticRegressionModel.load('./lr')

# Use a featurizer to use trained features from an existing model
featurizer_test = DeepImageFeaturizer(inputCol = "image", outputCol = "features", modelName = "InceptionV3")

# Pipeline both entities
p_test = PipelineModel(stages=[featurizer_test, lr_test])

# Test and evaluate
tested_df_test = p_test.transform(test_df)
evaluator_test = MulticlassClassificationEvaluator(metricName = "accuracy")
print("Test set accuracy = " + str(evaluator_test.evaluate(tested_df_test.select("prediction", "label"))))

tested_df_test.select("label", "probability", "prediction").show(20, False)

And everything should work fine!

dimagoldin commented 6 years ago

Thanks @noiseux1523 this works!

I was able do load the saved pipeline correctly (not just the last layer) in scala:

val model = PipelineModel.load("/path/to/model")

As long as i had "spark-deep-learning" dependency added in pom/sbt

<dependency>
            <groupId>databricks</groupId>
            <artifactId>spark-deep-learning</artifactId>
            <version>1.1.0-spark2.3-s_2.11</version>
</dependency>

But in python only your workaround worked.

I am still unable to make DeepImageFeaturizer transform images in a streaming job

p_model.transform(imageStream).select("probability", "prediction").writeStream.format("console").start().awaitTermination()

I am getting the following error:

pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;

I see there is an open issue on the subject #136 , haven't found a workaround yet.

jkbradley commented 6 years ago

Thanks for reporting this & the workaround! In order to avoid confusing more users, we decided to remove this functionality in the next release: https://github.com/databricks/spark-deep-learning/pull/161 It's going to require a bit of reworking within Spark itself to provide this kind of support for ML Persistence in Spark Packages.

However, the workaround will still work since it's creating a new DeepImageFeaturizer instance when loading the Pipeline.

I'll close this issue for now, but leaving the notes on the workaround will be helpful for some users, I'm sure. Thanks all!