combust / mleap

MLeap: Deploy ML Pipelines to Production
https://combust.github.io/mleap-docs/
Apache License 2.0
1.5k stars 310 forks source link

NoSuchElementException: key not found: org.apache.spark.ml.feature.OneHotEncoder #685

Closed Gaurav7296 closed 4 years ago

Gaurav7296 commented 4 years ago

import org.apache.spark.ml.feature.OneHotEncoderEstimator

depedency : "ml.combust.mleap" %% "mleap-spark" % "0.15.0"

@hollinwilkins tried your suggestion from similar issue, still getting the same error. Spark version: 2.4.5

ancasarb commented 4 years ago

Hey @Gaurav7296 Do you have a small example to reproduce this issue?

I tried with this small pipeline and mleap 0.15.0, and it serialized the pipeline without any issues. Do you notice any differences between this and your pipeline?

import org.apache.spark.ml.{Pipeline, Transformer, PipelineModel}
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.sql.types.{StringType, DoubleType, StructType}
import ml.combust.bundle.BundleFile
import ml.combust.bundle.serializer.SerializationFormat
import org.apache.spark.ml.feature.{StringIndexer,OneHotEncoderEstimator}
import org.apache.spark.ml.mleap.SparkUtil
import ml.combust.mleap.spark.SparkSupport._
import resource._
import org.apache.spark.ml.bundle.SparkBundleContext

val names = Seq("Hello, MLeap!", "Another row", "test")
val numbers = Seq(56.7d, 23.4d)
val rows = spark.sparkContext.parallelize(Seq.tabulate(3) { i => Row(names(i)) })
val schema = new StructType().add("name", StringType, nullable = false)

val dataset: DataFrame = spark.sqlContext.createDataFrame(rows, schema)

val stringIndexer = new StringIndexer().
  setInputCol("name").
  setOutputCol("index")

val oneHotEncoder = new OneHotEncoderEstimator()
            .setInputCols(Array("index"))
            .setOutputCols(Array("index_oh"))

val pipelineEstimator = new Pipeline().setStages(Array(stringIndexer, oneHotEncoder)).fit(dataset)

implicit val sbc = SparkBundleContext().withDataset(pipelineEstimator.transform(dataset))

val bundleName = "test-model.zip"
val pathSave = "/tmp/" + bundleName

dbutils.fs.rm("file:" + pathSave)

(for(file <- managed(BundleFile("jar:file:" + pathSave))) yield {
  pipelineEstimator.writeBundle.name("bundle")
    .format(SerializationFormat.Json)
    .save(file).get
}).tried.get

We also have a parity tests https://github.com/combust/mleap/blob/master/mleap-spark/src/test/scala/org/apache/spark/ml/parity/feature/OneHotEncoderParitySpec.scala that runs OK as well.

Let me know how you get on.

Thank you, Anca

Gaurav7296 commented 4 years ago

Thanks @ancasarb Actually the issue was with spark's oneHotEncoder as it's replaced with oneHotEncoderEstimator.