combust / mleap

MLeap: Deploy ML Pipelines to Production
https://combust.github.io/mleap-docs/
Apache License 2.0
1.5k stars 310 forks source link

MLeap Serialized Pipeline including a XGBoost Model does not predict same values as Spark Pipeline #625

Closed irene3030 closed 4 years ago

irene3030 commented 4 years ago

Hello,

I am currently working in a project where a machine learning model has been created using Apache Spark & XGBoost4J. In order to deploy this model in a productive environment, I've used MLeap and its extension for XGBoost to serialize my pipeline, which include the following modules: StringIndexer, OneHotEncoderEstimator, VectorAssembler and a XGBoost regression model.

When reading the MLeap Bundle object I find that the predictions obtained using the serialized XGBoost model included in this object are very different than the ones obtained using the model XGBoost directly with Spark & XGboost4J-Spark.

Here is how I create my pipeline, train the model and wrap it in a MLeap object:

def createFeaturePipeline(categoricalColumns: Array[String], numericalColumns: Array[String]): Pipeline = {
    val encodedCategoricalFeatures = categoricalColumns.flatMap{
      feature =>
        val indexer = new StringIndexer()
          .setInputCol(feature)
          .setOutputCol(feature + "_INDEX")
          .setHandleInvalid("keep")

        val oneHotEncoder = new OneHotEncoderEstimator()
          .setInputCols(Array(feature + "_INDEX"))
          .setOutputCols(Array(feature + "_VEC"))
          .setDropLast(false)

      Array(indexer, oneHotEncoder)
    }
    val vecCategoricalFeatures = categoricalColumns.map(e => e.concat("_VEC"))
    val vectorAssembler = new VectorAssembler()
      .setInputCols(vecCategoricalFeatures ++ numericalColumns)
      .setOutputCol(PortatilesModelConstants.featuresName)
    val featurePipeline: Pipeline = new Pipeline().setStages(encodedCategoricalFeatures ++ Array(vectorAssembler))
    featurePipeline
  }

  def createModel(): XGBoostRegressor ={
    val xgbParam = Map(
      "eta" -> 0.1,
      "verbosity" -> 3,
      "missing" -> 0,
      "num_workers" -> 1,
      "num_round" -> 200,
      "nthread" -> 1,
      "alpha" -> 0.0,
      "gamma" -> 0.6,
      "lambda" -> 0.4,
      "maxDepth" -> 7,
      "minChildWeight" -> 5.0,
      "subsample" -> 1.0
    )

    val xgbRegressor: XGBoostRegressor = new XGBoostRegressor(xgbParam)
      .setFeaturesCol(PortatilesModelConstants.featuresName)
      .setLabelCol(PortatilesModelConstants.targetFeature)
    xgbRegressor
  }

  def buildEvaluator(metric: String = "rmse"): RegressionEvaluator ={
     val evaluator = new RegressionEvaluator()
       .setLabelCol(PortatilesModelConstants.targetFeature)
       .setPredictionCol("prediction").setMetricName(metric)
      evaluator
   }

  def savePipelineToBundle(data: DataFrame, pipelineToSave: PipelineModel, path: String): Unit = {
    import ml.combust.mleap.spark.SparkSupport._
    implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(data)
    new File(path).delete()
    for (bf <- managed(BundleFile("jar:file:" + path))) {
      pipelineToSave.writeBundle.save(bf).get
    }
  }

  // Main flow
    val Array(split20, split80) = loadData(pathData).randomSplit(Array(0.20, 0.80))
    val testSet = split20.cache()
    val trainingSet = split80.cache()
    val categoricalColumns = PortatilesModelConstants.categoricalColumns
    val numericalColumns = PortatilesModelConstants.numericalColumns
    val featurePipelineFitted = createFeaturePipeline(categoricalColumns, numericalColumns).fit(trainingSet)
    val trainingSetTransformed = featurePipelineFitted.transform(trainingSet)
    val xgbModel = createModel().fit(trainingSetTransformed)
    val trainingSetPredicted = xgbModel.transform(trainingSetTransformed)
    val pipelineModel = SparkUtil.createPipelineModel(Array(featurePipelineFitted, xgbModel))

  // Test performance of the model
   val testSetTransformed = pipelineModel.transform(testSet)
   val mae = buildEvaluator("mae").evaluate(testSetTransformed)

  // Save pipeline
    savePipelineToBundle(pipelineModel.transform(trainingSet), pipelineModel, "/tmp/pipelineModel.zip")

(Just in case it is not clear, PortatilesModelConstants contains constants such as the name of the columns I am working with). And here you may find how I reading the MLeap object and testing the pipeline using the testSet. First I obtain my test set transformed through the serialized pipeline. Then I transform it back to Spark DataFrame and compute "MAE" metric :

val bundle = (for(bf <- managed(BundleFile("jar:file:/tmp/pipelineModel.zip"))) yield {
  bf.loadMleapBundle().get.root
}).tried.get

val rows = testSet.toSparkLeapFrame.dataset
val schema = testSet.toSparkLeapFrame.schema
val leapFrame = DefaultLeapFrame(schema, rows.collect().toSeq)
val leapFrameTransformed: DefaultLeapFrame = bundle.transform(leapFrame).get
val testSetTransformedMLeap: Seq[Row] = leapFrameTransformed.dataset
val columns = testSet.columns ++ Array("prediction")

// To transform the data obtained using MLeap back to a Spark Dataframe in order to compute metrics of the model (R2, MAE)
def mleapToSparkDF(mleapFrame: Seq[ml.combust.mleap.runtime.frame.Row], columns: Array[String]): DataFrame = {
  mleapFrame.map{
    mleapRow => (
        mleapRow.getString(0), //estado
        mleapRow.getString(1), //so
        mleapRow.getString(2), //cargador
        mleapRow.getString(3), // proc
        mleapRow.getString(4), // marca
        mleapRow.getString(5), // modelo
        mleapRow.getString(6), // tipo tg
        mleapRow.getString(7), // modelo tg
        mleapRow.getString(8), // tactil
        mleapRow.getDouble(9), // ssd
        mleapRow.getInt(10), // pulgadas
        mleapRow.getDouble(11), // hdd
        mleapRow.getDouble(12), //memoria ram
        mleapRow.getDouble(13), // capacidad tg
        mleapRow.getDouble(14), // precio original
        mleapRow.getLong(15), // idx
      //mleapRow.getAs[Tensor[Double]](index=33),
        mleapRow.getDouble(35) // precio prediction
      )
  }.toDF(columns:_*)
}
val testSetTransformed2 = mleapToSparkDF(testSetTransformedMLeap, columns)
val mae = buildEvaluator("mae").evaluate(testTransformed2)

And both the metrics and predicted values obtained with testSetTransformed and testSetTransformed2 are different:

scala> val mae = buildEvaluator("mae").evaluate(testSetTransformed)
mae: Double = 47.37160734311079 
scala> val mae = buildEvaluator("mae").evaluate(testSetTransformed2)
mae: Double = 538.1780686303168

Here you have a small sample of the test data, showing that the predictions are different:

+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+------------------+-----+
|ESTADO_PRODUCTO|SISTEMA_OPERATIVO|CARGADOR|   PROCESADOR|MARCA|     MODELO|TIPO_TARJETA_GRAFICA|MODELO_TARJETA_GRAFICA|PANTALLA_TACTIL|SSD_CAPACIDAD|PULGADAS|HDD_CAPACIDAD|MEMORIA_RAM|CAPACIDAD_TARJETA_GRAFICA|TOTAL_PRODUCTO_VENTA|        prediction|  idx|
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+------------------+-----+
|              B|           MAC_OS|      SI|INTEL_CORE_I5|APPLE|MACBOOK_PRO|          COMPARTIDA|                 OTROS|             NO|        256.0|      13|          0.0|        8.0|                      0.0|              1099.0| 922.5587768554688| 4894|
|              B|       WINDOWS_10|      SI|INTEL_CORE_I5| ASUS|      OTROS|          COMPARTIDA|                 OTROS|             NO|          0.0|      15|       1000.0|        8.0|                      0.0|               315.0|322.15277099609375| 2040|
|              C|           MAC_OS|      SI|INTEL_CORE_I5|APPLE|MACBOOK_PRO|          COMPARTIDA|                 OTROS|             NO|        128.0|      13|          0.0|        8.0|                      0.0|               765.0|   728.66357421875| 2927|
|              C|       WINDOWS_10|      SI|INTEL_CELERON| ACER|      OTROS|          COMPARTIDA|                 OTROS|             NO|          0.0|      15|        500.0|        8.0|                      0.0|               215.0| 194.9210205078125|10422|
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+------------------+-----+
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+-----------------+-----+
|ESTADO_PRODUCTO|SISTEMA_OPERATIVO|CARGADOR|   PROCESADOR|MARCA|     MODELO|TIPO_TARJETA_GRAFICA|MODELO_TARJETA_GRAFICA|PANTALLA_TACTIL|SSD_CAPACIDAD|PULGADAS|HDD_CAPACIDAD|MEMORIA_RAM|CAPACIDAD_TARJETA_GRAFICA|TOTAL_PRODUCTO_VENTA|       prediction|  idx|
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+-----------------+-----+
|              B|           MAC_OS|      SI|INTEL_CORE_I5|APPLE|MACBOOK_PRO|          COMPARTIDA|                 OTROS|             NO|        256.0|      13|          0.0|        8.0|                      0.0|              1099.0|918.6906127929688| 4894|
|              B|       WINDOWS_10|      SI|INTEL_CORE_I5| ASUS|      OTROS|          COMPARTIDA|                 OTROS|             NO|          0.0|      15|       1000.0|        8.0|                      0.0|               315.0|751.9229125976562| 2040|
|              C|           MAC_OS|      SI|INTEL_CORE_I5|APPLE|MACBOOK_PRO|          COMPARTIDA|                 OTROS|             NO|        128.0|      13|          0.0|        8.0|                      0.0|               765.0| 913.106201171875| 2927|
|              C|       WINDOWS_10|      SI|INTEL_CELERON| ACER|      OTROS|          COMPARTIDA|                 OTROS|             NO|          0.0|      15|        500.0|        8.0|                      0.0|               215.0| 838.892822265625|10422|
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+-----------------+-----+

Attached to this message, you may find

I would very much appreciate any help you could give me. Thanks a lot, Irene mleap_issue.zip

talalryz commented 4 years ago

@irene3030 There was an issue with the way that mleap handles sparse rows when predicting with xgboost. The pull request was recently merged a couple of days ago. (PR-205 ) Can you build master and check if this is still an issue?

irene3030 commented 4 years ago

Hello again,

First of all: thank you very much for your input and for fixing this issue.

I did exactly what you told me: I downloaded master branch (0.16.0-SNAPSHOT) and built the whole project. It worked like a charm! I do not longer have the problem I had and the predictions are the same than the ones obtained using Spark.

I did have one issue FYI (just in case anyone bumps into this as well): I had to manually package some of the modules: mleap-xgboost-spark sbt mleap-xgboost-spark/package & mleap-xgboost-runtime sbt mleap-xgboost-runtime/package, since trying to package from root directory did not included those modules.

My colleagues and I are very grateful :)

ancasarb commented 4 years ago

great, thanks @talalryz for all your help!

is it ok to close this issue if that's alright, changes will be included in the next release.

lucagiovagnoli commented 4 years ago

This is great, thanks @talalryz !

talalryz commented 4 years ago

We, at Yelp, had been struggling with this bug ourselves so we're glad we could help others out along the way :)