combust / mleap

MLeap: Deploy ML Pipelines to Production
https://combust.github.io/mleap-docs/
Apache License 2.0
1.51k stars 313 forks source link

Exception in thread "main" java.util.NoSuchElementException: key not found: num_features #490

Closed avinash-indix closed 5 years ago

avinash-indix commented 5 years ago
package holmes.model.building

/**
  * Created by indix on 8/1/19.
  */
import java.io.File

import holmes.model.building.spark.Int2DoubleSpark
import ml.combust.bundle.serializer.SerializationFormat.Protobuf
import ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row}
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{PCA, RegexTokenizer, StringIndexer, Word2Vec}
import org.apache.spark.ml.mleap.classification.OneVsRest

import scala.io.Source

import ml.combust.bundle.BundleFile
import ml.combust.mleap.runtime.MleapSupport._
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import resource.managed

object OneVsRestExample {

      def main(args: Array[String]) {
        val spark = SparkSession
          .builder
          .appName(s"holmes.model.building.OneVsRestExample")
          .getOrCreate()

        val toDouble = udf[String, String](_ match {

          case "1.0" => "one";
          case "2.0" => "two";
          case _ => "three";
        }
        )

        var inputData = spark.read
          .option("header", true)

          .option("delimiter", "\t")
          .option("mode", "DROPMALFORMED")
          .csv("/home/indix/indix_codebase/category_intent/data/test/medium_test.csv")
          .toDF("nodeId", "query", "bd_top_level", "bc")

        val tokenize = new RegexTokenizer()
          .setOutputCol("tokens")
          .setInputCol("query")
          .setPattern("\\W")

        // Learn a mapping from words to Vectors.
        val word2Vec = new Word2Vec()
          .setInputCol("tokens")
          .setOutputCol("w2v")

        val indexer = new StringIndexer().setInputCol("bd_top_level").setOutputCol("label").fit(inputData)
        inputData = indexer.transform(inputData)

        inputData.printSchema()
        inputData.show(10)

        println(inputData.columns.mkString("::"))

        // generate the train/test split.
        val Array(train, test) = inputData.randomSplit(Array(0.8, 0.2))

        // instantiate the base classifier

        val pca = new PCA()
          .setInputCol("w2v")
          .setOutputCol("pcaFeatures")
          .setK(3)

        import org.apache.spark.ml.Pipeline

        val classifier = new LogisticRegression()
              .setFeaturesCol("pcaFeatures")
              .setLabelCol("label")
              .setFitIntercept(true)
              .setPredictionCol("_prediction_")

        val ovr = new OneVsRest()
          .setClassifier(classifier)
          .setLabelCol(classifier.getLabelCol)
          .setFeaturesCol(classifier.getFeaturesCol)
          .setPredictionCol(classifier.getPredictionCol)

        val map = Map[Int, Double]()
        val i2dModel = new Int2DoubleModel(map)
        val i2dTransformer = new Int2DoubleSpark(i2dModel)
          .setInputCol(ovr.getPredictionCol)
          .setOutputCol("prediction")

        val w2vStages = Array(tokenize, word2Vec, pca, ovr, i2dTransformer)
        val pipeline = new Pipeline().setStages(w2vStages)

        val paramGrid = new ParamGridBuilder()
          .addGrid(pca.k, Array(2, 3, 4))
          .build()

        val evalMetric = new MulticlassClassificationEvaluator()
          .setMetricName("weightedPrecision")

        val cv = new CrossValidator()
          .setEstimator(pipeline)
          .setEvaluator(evalMetric)
          .setEstimatorParamMaps(paramGrid)
          .setNumFolds(4)
          .setParallelism(10)

            println("Before training")
            val model = cv.fit(train)
            println("After training")

            //writing to mleap bundle

            val topLevelModelBundleFileExt = "topLevelModel"
            val localPathOnDisk = "/home/indix/indix_codebase/data/ovr_cv_mleap_model"

            val transformedDataset = model.transform(train.limit(1))

            implicit val sbc = SparkBundleContext().withDataset(transformedDataset)

            (for(bf <- managed(BundleFile(s"file:$localPathOnDisk"))) yield {

              val tmp = model.writeBundle.format(Protobuf)
//              val tmp = BundleWriter(stages).format(Protobuf)

              tmp.save(bf).get
            }).tried.get
            spark.stop()

        // load mleap models and predict using them
        val modelDir = "/home/indix/indix_codebase/data/ovr_cv_mleap_model"
        val mleapModel = {
          **val model = (for (bf <- managed(BundleFile(new File(modelDir)))) yield {
            bf.loadMleapBundle()**
          }).tried.flatMap(identity).get.root

          println("loaded model from " + modelDir)
          model
        }

      }
  }
ancasarb commented 5 years ago

@avinash-indix would it be possible to perhaps get the csv file you were using so that I can run the example you've referenced?

I've tried a simple out of the box pipeline and was able to run without issues. The dataset I used is here https://github.com/combust/mleap/blob/master/mleap-databricks-runtime-testkit/src/main/resources/datasources/lending_club_sample.avro.

import org.apache.spark.ml.{Pipeline, Transformer}
import org.apache.spark.ml.classification.{LogisticRegression, OneVsRest}
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.sql.DataFrame
import com.databricks.spark.avro._

val dataset = spark.sqlContext.read.avro("/FileStore/tables/lending_club_sample.avro")

val pipeline = new Pipeline().setStages(Array(new StringIndexer().
    setInputCol("fico_score_group_fnl").
    setOutputCol("fico_index"),
    new VectorAssembler().
      setInputCols(Array("fico_index", "dti")).
      setOutputCol("features"),
    new OneVsRest().setClassifier(new LogisticRegression()).
      setLabelCol("fico_index").
      setFeaturesCol("features").
      setPredictionCol("prediction"))).fit(dataset)

import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.bundle.SparkBundleContext
import resource._

import ml.combust.bundle.serializer.SerializationFormat

val sbc = SparkBundleContext().withDataset(pipeline.transform(dataset))
for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline1.zip"))) {
      pipeline.writeBundle.format(SerializationFormat.Protobuf).save(bf)(sbc).get
}

import ml.combust.bundle.BundleFile
import ml.combust.mleap.runtime.MleapSupport._
import resource._

val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline1.zip"))) yield {
      bundleFile.loadMleapBundle().get
    }).opt.get

val mleapPipeline = bundle.root

println(mleapPipeline)
ancasarb commented 5 years ago

I can actually replicate your issue, using the mleap

import org.apache.spark.ml.mleap.classification.OneVsRest

Currently looking into what the issue is.

ancasarb commented 5 years ago

I've raised https://github.com/combust/mleap/pull/492 to fix this issue, in the meantime, you can also try to use the default (as in my example above)

import org.apache.spark.ml.classification.OneVsRest

if you don't require the additional probability column and don't use any custom transformers from mleap-spark-extensions.

avinash-indix commented 5 years ago

Thanks a lot Anca. I am using a custom transformer to convert the int prediction column to double(multiclassevaluator throws an error if not). In my code the i2dTransformer does only this. I could not find the transformer that could do this so I had to custom make one. If you know of any plz do let me know

On Mon, Feb 25, 2019, 21:09 Anca Sarb notifications@github.com wrote:

I've raised #492 https://github.com/combust/mleap/pull/492 to fix this issue, in the meantime, you can also try to use the default (as in my example above)

import org.apache.spark.ml.classification.OneVsRest

if you don't require the additional probability column and don't use any custom transformers from mleap-spark-extensions.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/combust/mleap/issues/490#issuecomment-467058686, or mute the thread https://github.com/notifications/unsubscribe-auth/AbNDbb5H6PGU9WP105T1AgJwXcwpru-Qks5vRAOjgaJpZM4bGwFT .

ancasarb commented 5 years ago

See if you can use import org.apache.spark.ml.classification.OneVsRest in your pipeline for now.

ancasarb commented 5 years ago

Closing as #492 was merged and will be released with the new mleap version.