h2oai / sparkling-water

Sparkling Water provides H2O functionality inside Spark cluster
https://docs.h2o.ai/sparkling-water/3.3/latest-stable/doc/index.html
Apache License 2.0
968 stars 360 forks source link

Not able to add target Encoder estimator in Spark Pipeline #2348

Closed preet3loq closed 4 years ago

preet3loq commented 4 years ago

Hello, I am trying to create a RF classification model with target Encoding and adding it in my Spark Pipeline but facing some issues,

       val targetEncoder = new H2OTargetEncoder().setInputCols(Array("gender", "recommendation_id"))
      .setLabelCol("label").setOutputCols(Array("gendervec", "recommendation_idvec" ))
      .setNoise(0.03)

     val continuousFeatureAssembler = new VectorAssembler().
      setInputCols(features.toArray).
      setOutputCol("features").setHandleInvalid("skip")

     var rf = new RandomForestClassifier().setFeaturesCol("features")
      .setLabelCol("label")

     val steps = Array(targetEncoder, continuousFeatureAssembler) ++ Array( rf)
      val pipeline = new Pipeline().setStages(steps)

Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4$$anonfun$6.apply(CrossValidator.scala:166) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4$$anonfun$6.apply(CrossValidator.scala:166) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4.apply(CrossValidator.scala:166) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4.apply(CrossValidator.scala:146) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1.apply(CrossValidator.scala:146) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1.apply(CrossValidator.scala:122) at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:185) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:185) at org.apache.spark.ml.tuning.CrossValidator.fit(CrossValidator.scala:122) at MLModel.MlClassificationModel$.runClassifier(MlClassificationModel.scala:47) at BancoPan.Model.ai.RunModel$.model(RunModel.scala:172) at BancoPan.Model.ai.createModel.preparePipeline(createModel.scala:61) at BancoPan.Model.Controller$.main(Controller.scala:150) at BancoPan.Model.Controller$.delayedEndpoint$BancoPan$Model$Controller$1(Controller.scala:65) at BancoPan.Model.Controller$delayedInit$body.apply(Controller.scala:15) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at BancoPan.Model.Controller$.main(Controller.scala:15) at BancoPan.Model.Controller.main(Controller.scala) Caused by: java.lang.NoSuchMethodException: ai.h2o.sparkling.ml.models.H2OTargetEncoderModel.(java.lang.String) at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.getConstructor(Class.java:1825) at org.apache.spark.ml.param.Params$class.defaultCopy(params.scala:846) at org.apache.spark.ml.PipelineStage.defaultCopy(Pipeline.scala:42) at ai.h2o.sparkling.ml.models.H2OTargetEncoderModel.copy(H2OTargetEncoderModel.scala:96) at ai.h2o.sparkling.ml.models.H2OTargetEncoderModel.copy(H2OTargetEncoderModel.scala:33) at org.apache.spark.ml.PipelineModel$$anonfun$copy$1.apply(Pipeline.scala:316) at org.apache.spark.ml.PipelineModel$$anonfun$copy$1.apply(Pipeline.scala:316) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.ml.PipelineModel.copy(Pipeline.scala:316) at org.apache.spark.ml.PipelineModel.copy(Pipeline.scala:293) at org.apache.spark.ml.Transformer.transform(Transformer.scala:64) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4$$anonfun$5$$anonfun$apply$1.apply$mcD$sp(CrossValidator.scala:159) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4$$anonfun$5$$anonfun$apply$1.apply(CrossValidator.scala:153) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4$$anonfun$5$$anonfun$apply$1.apply(CrossValidator.scala:153) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136) at scala.concurrent.impl.Future$.apply(Future.scala:31) at scala.concurrent.Future$.apply(Future.scala:494) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4$$anonfun$5.apply(CrossValidator.scala:162) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4$$anonfun$5.apply(CrossValidator.scala:152) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1$$anonfun$4.apply(CrossValidator.scala:152) ... 28 more

Process finished with exit code 1

This is the error message that I get,

  1. Is this becoz I need to use only H2o models? H2o document says that we can include target estimator in spark pipelines. @jakubhava Please help on this issue.
mn-mikke commented 4 years ago

Hi @preet3loq, what is the Sparkling Water version that you use?

preet3loq commented 4 years ago

Hi Marek @mn-mikke , Here is my build configuration,

scalaVersion := "2.11.11"

val sparkVersion = "2.4.3"

libraryDependencies ++= Seq(
   "com.typesafe" % "config" % "1.4.0",
  "org.apache.spark" %% "spark-sql" % "2.4.3",
  "org.apache.spark" %% "spark-core" % "2.4.3",
  "org.apache.spark" %% "spark-mllib" % "2.4.3",
  "joda-time" % "joda-time" % "2.10.4",
  //"com.databricks" % "spark-avro" % "4.0.0",
  "com.github.nscala-time" %% "nscala-time" % "2.18.0",
  //"ml.dmlc" % "xgboost4j-spark" % "0.90",
  "com.github.scopt" %% "scopt" % "3.7.0",
  "ml.combust.mleap" %% "mleap-spark-extension" % "0.16.0",
  "com.esotericsoftware.reflectasm" % "reflectasm" % "1.07",
  "org.scalafx" %% "scalafx" % "8.0.144-R12",
  "org.scala-lang" % "scala-compiler" % "2.11.11",
  "org.apache.spark" %% "spark-repl" % "2.4.4",
"ai.h2o" %% "sparkling-water-package" % "3.30.1.1-1-2.4", 
  "org.jpmml" % "jpmml-sparkml" % "1.6.1"
)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs@_*) => MergeStrategy.discard
  case x => MergeStrategy.first
}
mn-mikke commented 4 years ago

I think the problem is on the below line of your build configuration: case PathList("META-INF", xs@_*) => MergeStrategy.discard

I can't simply discard META-INF folder. H2O-3 and SW stores there important information. For META-INF/services/water.api.RestApiExtensions and META-INF/services/water.api.Schema are for target encoder crucial, but there will be more.

preet3loq commented 4 years ago

Hi Marek, this is not helping. I have tried commenting the build line.

  1. Also, could you see my spark version is compatible with H2O sparkling water? [ Caused by: java.lang.NoSuchMethodException: ai.h2o.sparkling.ml.models.H2OTargetEncoderModel.(java.lang.String) ]

  2. Could you help me with a code snippet for H2O sparkling water's target encoder used in a spark pipeline?

mn-mikke commented 4 years ago

Hi @preet3loq, spark version should be fine (except you use a different version for spark-repl). The only unsupported version is 2.4.2. I think there is still a problem in your assembly jar. Can you try to exclude sparkling-water-package from the assembly jar and put it onto classpath when you submit your application? https://docs.h2o.ai/sparkling-water/2.4/latest-stable/doc/tutorials/use_as_spark_package.html

Could you help me with a code snippet for H2O sparkling water's target encoder used in a spark pipeline?

https://docs.h2o.ai/sparkling-water/2.4/latest-stable/doc/ml/sw_target_encoder.html#target-encoder-in-ml-pipeline

preet3loq commented 4 years ago

Hi Marek, I tried with the code snippet the code works fine and I could see the predicitons. But I have a doubt, when I set the output for a transformer as,

 val targetEncoder = new H2OTargetEncoder().setInputCols(Array("gender", "home_state"))
        .setLabelCol("label")
      val continuousFeatureAssembler = new VectorAssembler().
        setInputCols(targetEncoder.getOutputCols()).
        setOutputCol("features").setHandleInvalid("skip")

      val rf = new RandomForestClassifier().setFeaturesCol(config.modelParametersC.modelFeatureC)
        .setLabelCol(config.modelParametersR.label_typeR)
        .setSubsamplingRate(0.5).setFeatureSubsetStrategy("auto").setSeed(42)

does it train on all the features? How do I verify that it trains on "gender_te" and not "gender"?

mn-mikke commented 4 years ago

Hi @preet3loq, I'm bit confused with the question. Have you identified any problem in H2OTargetEncoder, or not?

Is config.modelParametersC.modelFeatureC the same as continuousFeatureAssembler.getOutpuCol()?

If you have a trained pipeline model, you can get the relevant algorithm stage and cast it to RandomForestClassifierModel and call getFeaturesCol to see what column was used for model training.

mn-mikke commented 4 years ago

@preet3loq Do you need more help on this ticket or can we close it?