jpmml / jpmml-sparkml

Java library and command-line application for converting Apache Spark ML pipelines to PMML
GNU Affero General Public License v3.0
267 stars 80 forks source link

ConverterUtil can not transform models that trained by sparse data #2

Closed ZunwenYou closed 7 years ago

ZunwenYou commented 8 years ago

As we discussed in emails last week, I hope that this project can transform model to pmml in the second way as showed in the following code. Thanks.

Hello,

Sometimes, my training data would be sparse in libsvm format, and the Data Frame is suitable to be formatted as follow rather than using RFormula in mllib.

root |-- features: vector (nullable = false) |-- label: double (nullable = false)

This kind of "data layout" contains very little feature information. Sure, it could be converted to PMML, but in that case the "feature" column would be expanded into n double columns "x1", "x2", .., "x_n".

You could open an feature request in JPMML-SparkML issue tracker (https://github.com/jpmml/jpmml-sparkml/issues), and I would take care of it then. Also, please include a reproducible sample code.

VR

  def testPMML(sc: SparkContext) = {
    val rdd = sc.makeRDD(Seq((1.0, 2.0, 3.0, 0.0), (0.0, 2.0, 0.0, 3.0) , (1.0, 0.0, 0.0, 2.0)))
      .map(a => Row(a._1, Vectors.dense(Array(a._2, a._3, a._4)).toSparse))
    val schema = StructType(List(StructField("label", DoubleType), StructField("features", new VectorUDT)))
    val sqlContext = new SQLContext(sc)
    val irisData = sqlContext.createDataFrame(rdd, schema)

    val classifier = new LogisticRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")

    // the first way
    val pipeline = new Pipeline()
      .setStages(Array(classifier))
    val pipelineModel = pipeline.fit(irisData)
    var pmml = ConverterUtil.toPMML(schema, pipelineModel)
    JAXBUtil.marshalPMML(pmml, new StreamResult(System.out))

    // the second way
    val lrModel = classifier.fit(irisData)
    pmml = ConverterUtil.toPMML(schema, lrModel)
    JAXBUtil.marshalPMML(pmml, new StreamResult(System.out))
  }
kennyzli commented 8 years ago

Any ETA for this issue? I tried to train several models from spark 2.0 example, but seems several of them failed because of the Converter don't accept the Vector data type. is there alternative solution if I would like to support multi-features in my models? As Vector type will throw out exceptions somehow.

def main(args: Array[String]): Unit = {
    val logManager = LogManager.getLogger("CerebroPOC")
    val session = SparkSession.builder.appName("Cerebro").getOrCreate()
    val data = session.read.option("header", "false").option("inferSchema", "true")
      .csv("~/iris.data")
      .toDF("sepal length", "sepal width", "petal length", "petal width", "species")

    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

    logManager.warn("***************Hello warning before GBT Classifier*****************")

    val gbt = new GBTClassifier()
    val schema = data.schema

    val labelIndexer = new StringIndexer()
      .setInputCol("species")
      .setOutputCol("speciesLabel")
      .fit(data)

    val vectorAssember = new VectorAssembler()
    vectorAssember.setInputCols(Array("sepal length", "sepal width", "petal length", "petal width"))
    vectorAssember.setOutputCol("features")
    val vectorData = vectorAssember.transform(data)

    gbt.setLabelCol("speciesLabel")
      .setFeaturesCol("features")

    val pipelineModel = new Pipeline().setStages(Array(labelIndexer, gbt)).fit(vectorData)
    val resultData = vectorData.select("sepal length", "sepal width", "petal length", "petal width", "species", "features")
    val pmml = ConverterUtil.toPMML(resultData.schema, pipelineModel)
    logManager.warn("***************Hello warning PMML generated*****************")
    JAXBUtil.marshalPMML(pmml, new StreamResult(System.out))
  }
vruusmann commented 8 years ago

It is the intended behaviour to refuse to deal with VectorUDT columns whose "origin" is unknown. One can have VectorUDT columns, but they must be constructed within the Pipeline so that JPMML-SparkML can figure out the name, data type, operational type, etc. of individual VectorUDT elements.

@kennyzli It's possible to rearrange your script so that it would meet the above requirement. The idea is to make the VectorAssembler transformation part of the pipeline, and pass data.schema as the first argument to ConverterUtil#toPMML(Schema, PipelineModel) method.

Something like this:

val pipelineModel = new Pipeline().setStages(Array(labelIndexer, vectorAssembler, gbt)).fit(data)
val pmml = ConverterUtil.toPMML(data.schema, pipelineModel)
kennyzli commented 8 years ago

@vruusmann perfect, I just aware your approach works perfectly for me.. thanks a lot for the help!!!!

msjbear commented 7 years ago

@vruusmann @kennyzli , When I According to my advise to save gbdt mode, throw Exceptions as follows, What step am i missed?

spark: 1.6.1
jpmml-sparkml: 1.0.8
Exception in thread "main" java.lang.IllegalArgumentException: Expected -1 features, got 9 features
    at org.jpmml.sparkml.FeatureMapper.createSchema(FeatureMapper.java:169)
    at org.jpmml.sparkml.ConverterUtil.toPMML(ConverterUtil.java:123)
    at com.jd.risk.dm.spark.ml.fraud.GBTPmmlTask$.train(GBTPmmlTask.scala:239)
    at com.jd.risk.dm.spark.ml.fraud.GBTPmmlTask$.main(GBTPmmlTask.scala:159)
    at com.jd.risk.dm.spark.ml.fraud.GBTPmmlTask.main(GBTPmmlTask.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

this is my codes:

(training, test) = data.toDF("label","degree","tcNum","pageRank","commVertexNum","normQ","gtRate","eqRate","ltRate","level").randomSplit(Array(1.0 - fracTest, fracTest), 1234)
// Set up Pipeline
    val stages = new mutable.ArrayBuffer[PipelineStage]()
    // (1) For classification, re-index classes.
    val labelColName = if (algo == "classification") "indexedLabel" else "label"
    if (algo == "classification") {
      val labelIndexer = new StringIndexer()
        .setInputCol("label")
        .setOutputCol(labelColName)
      stages += labelIndexer
    }

    val vectorAssember = new VectorAssembler()
    vectorAssember.setInputCols(Array("degree","tcNum","pageRank","commVertexNum","normQ","gtRate","eqRate","ltRate","level"))
    vectorAssember.setOutputCol("features")
    val vectorData = vectorAssember.transform(training)

//    val vectorData = vectorAssember.transform(training)

    stages += vectorAssember
    // (3) Learn GBT.
    val dt = algo match {
      case "classification" =>
        new GBTClassifier()
          .setLabelCol(labelColName)
          .setFeaturesCol("features")
          .setMaxDepth(params.maxDepth)
          .setMaxBins(params.maxBins)
          .setMinInstancesPerNode(params.minInstancesPerNode)
          .setMinInfoGain(params.minInfoGain)
          .setCacheNodeIds(params.cacheNodeIds)
          .setCheckpointInterval(params.checkpointInterval)
          .setMaxIter(params.maxIter)
      case "regression" =>
        new GBTRegressor()
          .setFeaturesCol("features")
          .setLabelCol(labelColName)
          .setMaxDepth(params.maxDepth)
          .setMaxBins(params.maxBins)
          .setMinInstancesPerNode(params.minInstancesPerNode)
          .setMinInfoGain(params.minInfoGain)
          .setCacheNodeIds(params.cacheNodeIds)
          .setCheckpointInterval(params.checkpointInterval)
          .setMaxIter(params.maxIter)
      case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.")
    }
    stages += dt
    val pipeline = new Pipeline().setStages(stages.toArray)

    // Fit the Pipeline.
    val startTime = System.nanoTime()
    val pipelineModel = pipeline.fit(training)
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println(s"Training time: $elapsedTime seconds")

    /**
      * write model pmml format to hdfs
      */
    val modelPmmlPath = "sjmei/pmmlmodel"
    val pmml = ConverterUtil.toPMML(training.schema, pipelineModel);
//    val conf = new Configuration();
//    HadoopFileUtil.deleteFile(modelPmmlPath)
//    val path = new Path(modelPmmlPath);
//    val fs = path.getFileSystem(conf);
//    val out = fs.create(path);
    MetroJAXBUtil.marshalPMML(pmml, new FileOutputStream(modelPmmlPath));
vruusmann commented 7 years ago

java.lang.IllegalArgumentException: Expected -1 features, got 9 features

The method PredictionModel#numFeatures() returns -1, which means "unknown". This typically happens when the prediction model object is not properly initialized - the method PredictionModel#fit(...) has not been called yet.

Could always disable this sanity check by commenting it out.

msjbear commented 7 years ago

@vruusmann ,Thanks, When I upgrade the spark version to 2.x and jpmml-sparkml version to 1.1.5, the program runs ok.