lifeomic / sparkflow

Easy to use library to bring Tensorflow on Apache Spark
MIT License
298 stars 46 forks source link

Loading a saved Pipeline including SparkAsyncDL makes the model lose its inputCol #31

Closed PowerToThePeople111 closed 5 years ago

PowerToThePeople111 commented 5 years ago

Hi guys,

I am using sparkflow 0.7.0 on a Spark 2.4 emr cluster and try to load a pipeline that was created by code that you had in one example script.

from sparkflow.graph_utils import build_graph
from sparkflow.tensorflow_async import SparkAsyncDL
import tensorflow as tf
from pyspark.ml.feature import VectorAssembler, OneHotEncoder
from pyspark.ml.pipeline import Pipeline
from pyspark.sql import SparkSession
import os

def small_model():
    x = tf.placeholder(tf.float32, shape=[None, 784], name='x')
    y = tf.placeholder(tf.float32, shape=[None, 10], name='y')
    layer1 = tf.layers.dense(x, 256, activation=tf.nn.relu)
    layer2 = tf.layers.dense(layer1, 256, activation=tf.nn.relu)
    out = tf.layers.dense(layer2, 10)
    z = tf.argmax(out, 1, name='out')
    loss = tf.losses.softmax_cross_entropy(y, out)
    return loss

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("examples") \
        .getOrCreate()

    df = spark.read.option("inferSchema", "true").csv('s3://abucket/mnist_train.csv')
    df_test = spark.read.option("inferSchema", "true").csv('s3://abucket/mnist_test.csv')
    mg = build_graph(small_model)
    #Assemble and one hot encode
    va = VectorAssembler(inputCols=df.columns[1:785], outputCol='features')
    encoded = OneHotEncoder(inputCol='_c0', outputCol='labels', dropLast=False)

    spark_model = SparkAsyncDL(
        inputCol='features',
        tensorflowGraph=mg,
        tfInput='x:0',
        tfLabel='y:0',
        tfOutput='out:0',
        tfLearningRate=.001,
        iters=20,
        predictionCol='predicted',
        labelCol='labels',
        verbose=1,
        partitions=100
    )

    p = Pipeline(stages=[va, encoded, spark_model]).fit(df)
    p.write().overwrite().save("s3://abucket/mnist_model")

After creating the model, I tried to load the pipeline again and transform the mnist_test.csv dataset.


from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel 

pipeline = PipelineModel.load("s3://abucket/mnist_model")
df_test = spark.read.option("inferSchema", "true").csv("s3://abucket/mnist_test.csv")

predictions = pipeline.transform(df_test)

leads to the stacktrace:

Py4JJavaError: An error occurred while calling o161.transform.
: java.util.NoSuchElementException: Failed to find a default value for inputCol
    at org.apache.spark.ml.param.Params$$anonfun$getOrDefault$2.apply(params.scala:780)
    at org.apache.spark.ml.param.Params$$anonfun$getOrDefault$2.apply(params.scala:780)
    at scala.Option.getOrElse(Option.scala:121)
...

When executing alternatively:

predictions = pipeline.stages[2].transform(
  pipeline.stages[1].transform(  
    pipeline.stages[0].transform(df_test) 
  )
) //the most outer part fails being the model

Maybe there is something wrong with serialisation and deserialsation?

Best,

PowerToThePeople111 commented 5 years ago

My bad,

it was documented, but I did not see it before: using the PysparkPipelineWrapper.unwrap method helps. ;)


from pyspark.ml import Pipeline
from sparkflow.pipeline_util import PysparkPipelineWrapper
from pyspark.ml import PipelineModel 

pipeline = PysparkPipelineWrapper.unwrap(PipelineModel.load("s3://abucket/mnist_model"))
dmmiller612 commented 5 years ago

Thanks for filing the issue, as it reminded me that I believe pyspark recently added the capability for custom Transformers to be saved/loaded. That is why this hack had to go in, which I have wanted it to be gone for awhile now.

PowerToThePeople111 commented 5 years ago

I am happy, if that helped in some way. :)