jpmml / jpmml-sparkml

Java library and command-line application for converting Apache Spark ML pipelines to PMML
GNU Affero General Public License v3.0
267 stars 80 forks source link

feature datatype not support array? #88

Closed marvinxu-free closed 4 years ago

marvinxu-free commented 4 years ago

when i use use pmml, i found that model load from pmml not support array type,

vruusmann commented 4 years ago

i found that model load from pmml not support array type,

What do you exactly mean by "array type"? Any example?

marvinxu-free commented 4 years ago

here is my code:

test("testTrainModelPMML") {
    val data0 = spark.createDataFrame(Seq(
      ("1", 1.0, Map("a" -> 0.1, "b" -> 0.2, "c" -> 0.3), 1),
      ("2", 10.0, Map("d" -> 0.1, "e" -> 0.2, "c" -> 0.3), 0),
      ("3", 20.0, Map("x" -> 0.1, "a" -> 0.2, "b" -> 0.3), 0),
      ("4", 15.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0),
      ("5", 18.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0),
      ("6", 25.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 1),
      ("7", 30.0, Map("c" -> 0.1, "b" -> 0.2, "w" -> 0.3), 0))
    )
      .toDF("book_id", "pv", "myInputCol0", "label")

    val data = data0.withColumn("myInputCol", keyUdf(functions.col("myInputCol0")))
      .drop("myInputCol0")

    data.show(10)
    data.printSchema()

    val pipelineStage = new ArrayBuffer[PipelineStage]()

    val bookFiter = new StringIndexer()
      .setInputCol("book_id")
      .setOutputCol("book_id1")
    pipelineStage += bookFiter

    val bookOneHoter = new OneHotEncoder()
      .setInputCol("book_id1")
      .setOutputCol("book_id2")
      .setDropLast(false)
    pipelineStage += bookOneHoter

    val doubleDiscretizer = new QuantileDiscretizer()
      .setInputCol("pv")
      .setOutputCol("pv_bucket")
      .setNumBuckets(3)
    pipelineStage += doubleDiscretizer

    val myFiter = new CountVectorizer()
      .setInputCol("myInputCol")
      .setOutputCol("myInputCol1_vec")
      .setMinDF(1)
      .setMinTF(1)
      .setBinary(true)
    pipelineStage += myFiter

    val vectorAsCols = Array("pv_bucket", "book_id2", "myInputCol1_vec")

    val vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature")
    pipelineStage += vectorAssembler

    val scaler = new MinMaxScaler().setInputCol("vectorFeature").setOutputCol("scaledFeatures")
    pipelineStage += scaler

    val lr: LogisticRegression = new LogisticRegression()
      .setFitIntercept(true)
      .setMaxIter(1) //max iteration
      .setFeaturesCol("scaledFeatures")
      .setLabelCol("label")

    pipelineStage += lr

    //    val featurePipeline = new Pipeline().setStages(Array(myFiter, labelQuant, vectorAssembler, scaler))
    val featurePipeline = new Pipeline().setStages(pipelineStage.toArray)

    val fitor = featurePipeline.fit(data)
    val data1 = fitor.transform(data)

    val lrm = fitor.stages.last.asInstanceOf[LogisticRegressionModel]

    val vecModels = fitor
      .stages
      .map(x => Try(x.asInstanceOf[CountVectorizerModel]))
      .filter(x => x.isSuccess)

    //    val quantModels = fitor
    //      .stages
    //      .map(x => Try(x.asInstanceOf[Bucketizer]))
    //      .filter(x => x.isSuccess)

    val featureIndex = data1.schema.fieldIndex("vectorFeature")
    val vecMap = data1.schema.fields(featureIndex).metadata.getMetadata("ml_attr").getMetadata("attrs")
    val featureMapping = Array(
      Try(vecMap.getMetadataArray("numeric")),
      Try(vecMap.getMetadataArray("binary")),
      Try(vecMap.getMetadataArray("nominal"))
    )
      .filter(x => x.isSuccess)
      .flatMap(x=> x.get)
      .map(x=> (x.getLong("idx"), x.getString("name")))
    //    val featureMaps = vecMap.getMetadataArray("numeric") ++ vecMap.getMetadataArray("binary")
    //    val featureMapping = featureMaps.map(x => (x.getLong("idx"), x.getString("name"))).toMap
    val featureCof = featureMapping
      .map(x => (x._2, x._1))
      .map(x => (x._1, lrm.coefficients(x._2.toInt)))
      .toList
      .sortBy(x => x._2)
      .reverse
      .map(x => {
        if (x._1.contains("_vec")) {
          val featureOrgIndex = x._1.lastIndexOf("_").toInt
          val vecModelOutCol = x._1.substring(0, featureOrgIndex)
          val vecIndex = x._1.substring(featureOrgIndex + 1).toInt
          val vecModels = fitor
            .stages
            .map(x => Try(x.asInstanceOf[CountVectorizerModel]))
            .filter(x => x.isSuccess && x.get.getOutputCol == vecModelOutCol)
            .map(x => x.get)
          val feature = vecModels.head.vocabulary(vecIndex)
          (s"${vecModelOutCol}_${feature}", x._2)
        } else {
          (x._1, x._2)
        }
      })
      .map(x => s"${x._1}\t${x._2}")
      .mkString("\n")

    data1.show()
    println(data.schema)

    val jpmmlModelSerializer = new com.zenmen.wkread.pmmlModel.serving.jpmml.serialization.ModelSerializer()
    jpmmlModelSerializer.serializeModel(fitor, s"${dataOutPath}/lr_base/lr.model.jpmml.xml", data1.schema)
  }

and erros message is :


Expected string, integral, double or boolean type, got array type
java.lang.IllegalArgumentException: Expected string, integral, double or boolean type, got array type
    at org.jpmml.sparkml.SparkMLEncoder.createDataField(SparkMLEncoder.java:169)
    at org.jpmml.sparkml.SparkMLEncoder.getFeatures(SparkMLEncoder.java:76)
    at org.jpmml.sparkml.SparkMLEncoder.getOnlyFeature(SparkMLEncoder.java:63)
    at org.jpmml.sparkml.feature.CountVectorizerModelConverter.encodeFeatures(CountVectorizerModelConverter.java:57)
    at org.jpmml.sparkml.FeatureConverter.registerFeatures(FeatureConverter.java:47)
    at org.jpmml.sparkml.PMMLBuilder.build(PMMLBuilder.java:110)
    at com.zenmen.wkread.pmmlModel.serving.jpmml.serialization.ModelSerializer.serializeModel(ModelSerializer.scala:16)
    at com.zenmen.wkread.pmmlModel.ctrModel.ChapterLrModelTest$$anonfun$5.apply$mcV$sp(ChapterLrModelTest.scala:405)
    at com.zenmen.wkread.pmmlModel.ctrModel.ChapterLrModelTest$$anonfun$5.apply(ChapterLrModelTest.scala:284)
    at com.zenmen.wkread.pmmlModel.ctrModel.ChapterLrModelTest$$anonfun$5.apply(ChapterLrModelTest.scala:284)
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
    at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
    at org.scalatest.FunSuite.runTest(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
    at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
    at org.scalatest.Suite$class.run(Suite.scala:1424)
    at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
    at com.zenmen.wkread.pmmlModel.ctrModel.ChapterLrModelTest.org$scalatest$BeforeAndAfterAll$$super$run(ChapterLrModelTest.scala:16)
    at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
    at com.zenmen.wkread.pmmlModel.ctrModel.ChapterLrModelTest.run(ChapterLrModelTest.scala:16)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
    at org.scalatest.tools.Runner$.run(Runner.scala:883)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)```
marvinxu-free commented 4 years ago

when i use CountVectorizer in my pipeline model, which can not save as a jpmml model

vruusmann commented 4 years ago

This issue has got nothing to do with the JPMML-Transpiler library. Moving it to the JPMML-SparkML project.

vruusmann commented 4 years ago

Exact duplicate of https://github.com/jpmml/pyspark2pmml/issues/26

The "expected scalar type, got array type" error is conceptually very close to the "expected scalar type, got vector type" error, which is documented in https://github.com/jpmml/jpmml-sparkml/issues/21, https://github.com/jpmml/jpmml-sparkml/issues/26 etc.

The resolution is still the same - you cannot use structured column types (such as maps, vectors etc) in your data frame. They need to be unpacked (aka flattened, exploded etc) to plain scalar column types. See the instructions in the linked issues.

when i use CountVectorizer in my pipeline model, which can not save as a jpmml model

The CountVectorizer transformation type is supported, but you need to apply to to a string column, not map/vector/array column.

marvinxu-free commented 4 years ago

use string indexer and one-hot stage will lead to spark program run very slow and even failed, while CountVectorizer with VectorAssembler is fastest and can not save with jpmml;

mleap support array type column, which means this is not an unresolvable problem, but the predict result is not same while load save model in local environment for mleap, maybe i miss something...

marvinxu-free commented 4 years ago

use string indexer and one-hot stage will lead to spark program run very slow and even failed, while CountVectorizer with VectorAssembler is fastest and can not save with jpmml;

mleap support array type column, which means this is not an unresolvable problem, but the predict result is not same while load save model in local environment for mleap, maybe i miss something...

mleap has a bug, do not use min_max scaler can resolve this problem , thus I suggest any one see this problem to using mleap in your service.