jpmml / jpmml-evaluator-spark

PMML evaluator library for the Apache Spark cluster computing system (http://spark.apache.org/)
GNU Affero General Public License v3.0
94 stars 43 forks source link

when to support for spark 2.1.x #11

Closed MichaelXucf closed 6 years ago

MichaelXucf commented 6 years ago

Hi, vruusmann. i meet a problem when i use jpmml-evaluator-spark with spark 2.1.1.

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.CreateStruct.(Lscala/collection/Seq;)V at org.jpmml.evaluator.spark.PMMLTransformer.transform(PMMLTransformer.java:151) at com.michaelxu.spark.TestPipeLine.testJPMML(TestPipeLine.scala:312) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

the version of jpmml-evaluator-spark i used is 1.1-SNAPSHOT ,it support version of spark is 2.0.0 to 2.0.2. the CreateStruct defined in 2.0.2 is a case class

 /**
 * Returns a Row containing the evaluation of all children expressions.
 */
@ExpressionDescription(
  usage = "_FUNC_(col1, col2, col3, ...) - Creates a struct with the given field values.")
case class CreateStruct(children: Seq[Expression]) extends Expression {

  override def foldable: Boolean = children.forall(_.foldable)

  override lazy val dataType: StructType = {
    val fields = children.zipWithIndex.map { case (child, idx) =>
      child match {
        case ne: NamedExpression =>
          StructField(ne.name, ne.dataType, ne.nullable, ne.metadata)
        case _ =>
          StructField(s"col${idx + 1}", child.dataType, child.nullable, Metadata.empty)
      }
    }
    StructType(fields)
  }
......

but in spark 2.1.1, the CreateStruct defined as a object

/**
 * Returns a Row containing the evaluation of all children expressions.
 */
object CreateStruct extends FunctionBuilder {
  def apply(children: Seq[Expression]): CreateNamedStruct = {
    CreateNamedStruct(children.zipWithIndex.flatMap {
      case (e: NamedExpression, _) if e.resolved => Seq(Literal(e.name), e)
      case (e: NamedExpression, _) => Seq(NamePlaceholder, e)
      case (e, index) => Seq(Literal(s"col${index + 1}"), e)
    })
  }

  /**
   * Entry to use in the function registry.
   */
  val registryEntry: (String, (ExpressionInfo, FunctionBuilder)) = {
    val info: ExpressionInfo = new ExpressionInfo(
      "org.apache.spark.sql.catalyst.expressions.NamedStruct",
      null,
      "struct",
      "_FUNC_(col1, col2, col3, ...) - Creates a struct with the given field values.",
      "")
    ("struct", (info, this))
  }
}

i'm not sure how to change the code in PMMLTransformer,can you give me some suggestion? thank you.

Expression evaluateExpression = new ScalaUDF(evaluatorFunction, getOutputSchema(), ScalaUtil.<Expression>singletonSeq(new CreateStruct(ScalaUtil.<Expression>toSeq(activeExpressions))), ScalaUtil.<DataType>emptySeq());
vruusmann commented 6 years ago

For the moment, see @sidfeiner's adaption: https://github.com/sidfeiner/jpmml-spark/commit/cf897ed6efe585aad976357ece86081b94f17f75

The removal of CreateStruct function is a breaking change between Apache Spark 2.0 and 2.1 APIs. It would be wasteful to start maintaining two JPMML-Evaluator-Spark development branches (eg. 1.1-SNAPSHOT and 1.2-SNAPSHOT) only because of that.

The solution would be to introduce my own CreateStruct function, which works with all Apache Spark 2.X versions (including the latest 2.2.0 version).

sidfeiner commented 6 years ago

Hey, I've had another commit fixing this issue. The CreateStruct might not be a class anymore, but it still has an apply function so that's what I use

MichaelXucf commented 6 years ago

@vruusmann @sidfeiner thank you, I have resloved my problem with using sidfeiner/jpmml-spark 's method. By the way, it has a little problem when jpmml-spark worked with spark 2.1.1. The constructor of ScalaUDF is slightly different in spark 2.1.0 and 2.1.1。

In Spark 2.1.0

Expression evaluateExpression = new ScalaUDF(
        evaluatorFunction,
        getOutputSchema(),
        ScalaUtil.<Expression>singletonSeq(CreateStruct.apply(ScalaUtil.<Expression>toSeq(activeExpressions))),
        ScalaUtil.<DataType>emptySeq()
        );

In Spark 2.1.1, ScalaUDF need a Option paramter.

Expression evaluateExpression = new ScalaUDF(
        evaluatorFunction,
        getOutputSchema(),
        ScalaUtil.<Expression>singletonSeq(CreateStruct.apply(ScalaUtil.<Expression>toSeq(activeExpressions))),
        ScalaUtil.<DataType>emptySeq(), 
                None$.<String>empty());
vruusmann commented 6 years ago

It would be a really bad news if Apache Spark APIs break like this already between minor versions.

As mentioned earlier, the JPMML-Evaluator-Spark should be source and binary compatible with the widest range of Apache Spark 2.X versions (ideally all 2.0.X, 2.1.X and 2.2.X release versions). I hope it can be achieved without having to write my own ScalaUDF function (in addition to CreateStruct function).

If my IDE autocomplete is not mistaken, then the CreateStruct#apply(..) method is not available in Apache Spark 2.0.0 version?

MichaelXucf commented 6 years ago

I get an exception When I use sidfeiner/jpmml-spark 's branch worked with spark 2.1.1. sidfeiner/jpmml-spark is compiled with spark 2.1.0. and , my project depend on spark-2.1.1.

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(Ljava/lang/Object;Lorg/apache/spark/sql/types/DataType;Lscala/collection/Seq;Lscala/collection/Seq;)V

    at org.jpmml.spark.PMMLTransformer.transform(PMMLTransformer.java:154)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
    at org.apache.spark.ml.PipelineModel$$anonfun$transform$1.apply(Pipeline.scala:305)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
    at org.apache.spark.ml.PipelineModel.transform(Pipeline.scala:305)
    at com.michaelxu.spark.TestPipeLine.testJPMML(TestPipeLine.scala:306)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    ......

In Spark 2.1.0 ScalaUDF is defined as

case class ScalaUDF(
    function: AnyRef,
    dataType: DataType,
    children: Seq[Expression],
    inputTypes: Seq[DataType] = Nil)

In Spark 2.1.1 ScalaUDF is defined as

case class ScalaUDF(
    function: AnyRef,
    dataType: DataType,
    children: Seq[Expression],
    inputTypes: Seq[DataType] = Nil,
    udfName: Option[String] = None)

When use ScalaUDF in java with spark 2.1.1, ScalaUDF must receive 5 parameters.This may be the problem.

CreateStruct#apply(..) method is not available in Apache Spark 2.0.2 version, just has an method
CreateStruct#unapply(..)

sidfeiner commented 6 years ago

The fact they added new params to the ScalaUDF constructor shouldn't break API's if they were written in Scala because they added params with default values. Our problem is that the evaluator is written in Java and this doesn't allow us to use the default params. We could fill them in by ourselves, and we could also rewrite PMMLTransformer in Scala as that may avoid future cases of Spark adding params with default values to their API's.

And CreateStruct was a case class in Spark 2.0, and all case classes have an apply function. Does your IDE/compiler throw errors if you try to call it's apply function?

vruusmann commented 6 years ago

That's a good point that selected Java classes (such as org.jpmml.evaluator.spark.PMMLTransformer) should be translated to Scala.

Haven't set up any Scala projects so far. But based on what I've seen elsewhere, it seems pretty difficult to make a mixed Java/Scala project to compile and build nicely. I dread the idea that I would need to switch from Apache Maven build to SBT build etc.

sidfeiner commented 6 years ago

I have had a few mixed projects, if you want I can help you with it. You won't even have to switch to SBT because there's a Scala plugin for Maven

samhendley commented 6 years ago

Sorry to resurrect an old thread but I believe that translating classes into scala won't fix the default argument issues. The scala compiler copies the default values into the calling code during compilation. If you change the compiled library you would still get a failure but it would look like a MethodNotFoundException. This is my biggest gripe with scala, it is much more difficult to reason about what is/isn't a breaking binary change and the "scala way" promotes binary incompatibility by preferring defaults over explicit overloading.

vruusmann commented 6 years ago

Thanks @samhendley - great insight into the operations of the scala compiler. So, the conclusion is that it's virtually impossible to write a unitary Java/Scala codebase that could be compiled, packaged and then run on a variety of different Apache Spark versions (in the current case, all 2.0.X, 2.1.X and 2.2.X versions). There must be some variance in Java/Scala codebase that detects the version of the running Apache Spark version, and then chooses appropriate parameterization for the ScalaUDF function?

Anyway, the starting point would be to extract the "variable" part of Java code from the TransformerBuilder#transform(Dataset) method, into a separate Scala utility method (that the scala compiler could see and understand, and apply its workarounds).

valdo404 commented 6 years ago

Hi, I just pushed some pull request about this. have a look. it's working with spark 2.2. And may work with spark 2.1 #12

vladimir-lu commented 6 years ago

@vruusmann I've only done this with sbt projects - but essentially you can remain source-compatible with Spark by compiling against different versions of libraries against the same source folder (similarly for Scala versions but you can additionally have a scala-2.10/scala-2.11/scala-2.12 folder for when things are really not compatible). This is still somewhat hacky to do in sbt but at least it's doable. In maven I guess you would need a custom plugin.

+1 to PR to convert this project to Scala. While it doesn't solve all problems, wrapping Scala libraries is usually best done in Scala (and there's actually no compatibility need here to provide a Java interface AFAIS).

@samhendley Scala binary compatibility is non-trivial - this is why tools like MiMa are there to ensure it.