databricks / spark-deep-learning

Deep Learning Pipelines for Apache Spark
https://databricks.github.io/spark-deep-learning
Apache License 2.0
1.99k stars 494 forks source link

Save a pyspark ml pipeline model #191

Open Liangmp opened 5 years ago

Liangmp commented 5 years ago

Hi,

I am trying to save a model for future use, belowed are the code:

from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import *
img_dir = "hdfs:///personalities"
jobs_df = ImageSchema.readImages(img_dir + "/jobs").withColumn("label", lit(1))
zuckerberg_df = ImageSchema.readImages(img_dir + "/zuckerberg").withColumn("label", lit(0))
jobs_train, jobs_test = jobs_df.randomSplit([0.6, 0.4]) #0.6 for training, 0.4 for testing
zuckerberg_train, zuckerberg_test = zuckerberg_df.randomSplit([0.6, 0.4])
train_df = jobs_train.unionAll(zuckerberg_train)
test_df = jobs_test.unionAll(zuckerberg_test)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])
p_model = p.fit(train_df)
p_model.save("hdfs:///ml_model")

However, I get the following errer:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark-2.4.0-bin-hadoop2.7/python/pyspark/ml/util.py", line 244, in save
    self.write().save(path)
  File "/opt/spark-2.4.0-bin-hadoop2.7/python/pyspark/ml/util.py", line 136, in save
    self.saveImpl(path)
  File "/opt/spark-2.4.0-bin-hadoop2.7/python/pyspark/ml/pipeline.py", line 225, in saveImpl
    PipelineSharedReadWrite.validateStages(stages)
  File "/opt/spark-2.4.0-bin-hadoop2.7/python/pyspark/ml/pipeline.py", line 348, in validateStages
    stage.uid, type(stage))
ValueError: ('Pipeline write will fail on this pipeline because stage %s of type %s is not MLWritable', 'DeepImageFeaturizer_bbeb5c2d479e', <class 'sparkdl.transformers.named_image.DeepImageFeaturizer'>)

I find a similar issue here but we have a different error. Can any one give me some help?

Liangmp commented 5 years ago

It seems that stages[1] LogisticRegressionModel can be saved, while stages[0] DeepImageFeaturizer can not be saved. Pipeline model p_model contains both of these two stages, therefore, it can not be saved.

>>> print(type(p_model.stages[0]))
<class 'sparkdl.transformers.named_image.DeepImageFeaturizer'>
>>> print(type(p_model.stages[1]))
<class 'pyspark.ml.classification.LogisticRegressionModel'>

According to Source code for pyspark.ml.pipeline, when read/write is performed, function validateStages will be called to check whether every stage inside the pipeline model is instance of MLWritable.

    @staticmethod
    def validateStages(stages):
        """
        Check that all stages are Writable
        """
        for stage in stages:
            if not isinstance(stage, MLWritable):
                raise ValueError("Pipeline write will fail on this pipeline " +
                                 "because stage %s of type %s is not MLWritable",
                                 stage.uid, type(stage))