combust / mleap

MLeap: Deploy ML Pipelines to Production
https://combust.github.io/mleap-docs/
Apache License 2.0
1.5k stars 310 forks source link

MLEAP conversion of Spark 2.1 model is failing using MLEAP 0.10.2 #427

Closed vegoutha closed 3 years ago

vegoutha commented 5 years ago

I have updated the MLEAP runtime from 0.7.0 to 0.10.0. MLEAP conversion of Spark 2.1 model is failing using this updated mleap library.

MLEAP Runtime: 0.10.2

Here is code snippet to create Spark Model:

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

    val sentenceData = spark.createDataFrame(Seq(
      (0.0, "Hi I heard about Spark"),
      (0.0, "I wish Java could use case classes"),
      (1.0, "Logistic regression models are neat"),
      (1.0, "Words 2 Vector regression is used"),
      (1.0, "Count Vectorizer regression is used"),
      (0.0, "ChiSqSelector regression is used"),
      (1.0, "Vector Slicer regression is used"),
      (1.0, "Consolidated regression is used")
    )).toDF("label", "sentence")

    val Array(trainingData, testData) = sentenceData.randomSplit(Array(0.7, 0.3))

    val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")

    val remover = new StopWordsRemover().setInputCol("words").setOutputCol("filteredWords")

    val hashingTF = new HashingTF().setInputCol("filteredWords").setOutputCol("rawFeatures").setNumFeatures(20)
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("idfFeatures")

    // Learn a mapping from words to Vectors.
    val word2Vec = new Word2Vec().setInputCol("filteredWords").setOutputCol("resultword2Vec").setVectorSize(3).setMinCount(0)

    // fit a CountVectorizerModel from the corpus
    val cv: CountVectorizer = new CountVectorizer().setInputCol("filteredWords").setOutputCol("resultcv").setVocabSize(3).setMinDF(2)

    val vectorAss = new VectorAssembler().setInputCols(Array("idfFeatures", "resultword2Vec", "resultcv")).setOutputCol("features")

    val selector = new ChiSqSelector().setNumTopFeatures(5).setFeaturesCol("features").setOutputCol("selectedFeatures")

    val slicer = new VectorSlicer().setInputCol("selectedFeatures").setOutputCol("slicedFeatures").setIndices(Array(2))

    val lr = new LogisticRegression().setFeaturesCol("slicedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)

    // Chain indexers and GBT in a Pipeline.
    val pipeline = new Pipeline().setStages(Array(tokenizer, remover, hashingTF, idf, word2Vec, cv, vectorAss, selector, slicer, lr))

    val model = pipeline.fit(trainingData)

    // Make predictions.
    val predictions = model.transform(testData)

Below is the error message observed during the MLEAP conversion of Spark 2.1 model:

Spark Model to MLeap conversion failed with error: Job aborted due to stage failure: Task 6 in stage 34.0 failed 1 times, most recent failure: Lost task 6.0 in stage 34.0 (TID 62, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.collection.immutable.$colon$colon is not a valid external type for schema of double
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, label), DoubleType) AS label#58
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, label), DoubleType)
   +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, label)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
         +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, sentence), StringType), true) AS sentence#59
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, sentence), StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 1
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, sentence), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, sentence), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, sentence)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]

    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:293)
    at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547)
    at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:547)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: scala.collection.immutable.$colon$colon is not a valid external type for schema of double
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
    ... 20 more

The same model was working fine using MLEAP 0.7.0. Please have a look at this issue.

vegoutha commented 5 years ago

@hollinwilkins Could you please the above MLEAP conversion of Spark 2.1 model using MLEAP 0.10.2 library ?

ancasarb commented 3 years ago

closing, please re-open if it's still an issue.