typelevel / frameless

Expressive types for Spark.
Apache License 2.0
876 stars 138 forks source link

Spark 4.0 / DBR 14.2+ - bleeding edge changes #787

Open chris-twiner opened 7 months ago

chris-twiner commented 7 months ago

Per my comment on #755 DBR 14.2, 4.0 and likely all later versions includes SPARK-44913 StaticInvoke changes.

Whilst this hasn't yet been backported to 3.5 branch it could well end up there.

I'm happy to fork and publish a bleeding edge / non-standard frameless if needed but I also wonder if a compat layer as a separate swappable jar is the best route similar to #300 for example.

What is the collective preferred route to fixing / working around this?

pomadchin commented 7 months ago

Hmm, swappable jar?

I'm super open to add any neccessary compat layers; shoot a PR and we'll get it in if you have any nice ideas!

chris-twiner commented 6 months ago

13.3 LTS backported the 3.5 isNativeType change as well so that's reflected in the title. (I was mistaken, 0.16 spark33 builds work fine)

fyi - I've created shim to handle the abstraction barring isNative the approach seems workable. I'll start on frameless shims in the next days. lots to do there.

chris-twiner commented 6 months ago

fyi - With the 1st shim snapshot push, compiling against 3.5.oss works when running against 14.3.dbr, only StaticInvoke needed doing. (so the same frameless jar can be run against both 14.0/14.1 and 14.2/14.3 by swapping the shim to the right dbr version. or indeed users stay with the oss version as per a normal dependency)

The code for StaticInvoke handling and shims etc. is branch here and diff here

I'll target the major pain points impacted in each OSS major/minor release next (i.e. TypedEncoder, TypedExpressionEncoder and RecordEncoder) to have each internal api usage pulled out (e.g. [Un]WrapOption, Invoke, NewInstance, GetStructField, ifisnull, GetColumnByOrdinal, MapObjects and probably TypedExpressionEncoder itself). It's probably worth doing them in advance of any pull request.

What I'll attempt with this is to see how much of the encoding logic can be re-used from the current frameless codebase and targetted major versions on older dbrs (e.g. can we get a 3.5 oss frameless jar running on a 3.1.2 Databricks runtime)

If you'd like me to add FramelessInternals.objectTypeFor, ScalaReflection.dataTypeFor etc. as well I think that'd make sense but Reflection had been fairly stable code before they ripped it out :)

chris-twiner commented 6 months ago

@pomadchin - So at time of writing, building the current 0.16 based fork branch (rev 7944fe9 is pre-reformatting) against the 3.5 correct shim_runtime version and testing the encoding functionality (used by Quality tests built against 0.16 frameless with 3.1.3 oss base) with the shim_runtime for 9.1.dbr works despite the very different impl.

I'd not want to advertise that it's possible to jump versions so much (there are other issues like kmeans and join interface changes of course) but it proves the approach works at least and may ease 4.x support.

Pre-reformatting functional change diff is here. Key mima change is removal of frameless.MapGroups, it could of course be kept and just forwarding to a forward if needed.

chris-twiner commented 5 months ago

per b880261, #803 and #804 are confirmed as working on all LTS versions of Databricks, Spark 4 and the latest 15.0 runtime - test combinations are documented here

chris-twiner commented 5 months ago

A number of test issues appear when running on a cluster, these do not appear on a single node server (e.g. github runners, dev box or even Databricks Community Edition).

doubles lose precision on serialisation, e.g.:

stddev_samp *** FAILED *** (19 seconds, 196 milliseconds)
  GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
   (AggregateFunctionsTests.scala:591)
    Falsified after 5 successful property evaluations.
    Location: (AggregateFunctionsTests.scala:591)
    Occurred when passed generated values (
      arg0 = List("X2(1,-2147483648)", "X2(1,654883454)", "X2(-1,-2147483648)", "X2(1,0)") // 4 shrinks
    )
    Label of failing property:
      Expected Map(1 -> Some(1.4659365454162877E9), -1 -> None) but got Map(1 -> Some(1.4659365454162874E9), -1 -> None)

the very last digit didn't match, as such all double gens have to be serializable, the same occurs for BigDecimals on other tests (like AggregateFunctionsTest first/last) but this is likely due to lack of the package arbitraries being correct in the testless shade (they are correct when used via TestlessSingle in the ide).

for the order by:

import frameless.{X2, X3}
import spark.implicits._
val v = Vector(X3(-1,false,X2(586394193,6313416569807298536L)), X3(2147483647,false,X2(1,-1L)), X3(729528245,false,X2(1,-1L)))
v.toDS.orderBy("c").collect().toVector

the error that can occur is:

derives a CatalystOrdered for case classes when all fields are comparable *** FAILED *** (11 seconds, 784 milliseconds)
  GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
   (OrderByTests.scala:177)
    Falsified after 5 successful property evaluations.
    Location: (OrderByTests.scala:177)
    Occurred when passed generated values (
      arg0 = Vector(X3(-1,false,X2(586394193,6313416569807298536)), X3(2147483647,false,X2(1,-1)), X3(729528245,false,X2(1,-1))) // 2 shrinks
    )
    Label of failing property:
      Expected Vector(X3(729528245,false,X2(1,-1)), X3(2147483647,false,X2(1,-1)), X3(-1,false,X2(586394193,6313416569807298536))) but got Vector(X3(2147483647,false,X2(1,-1)), X3(729528245,false,X2(1,-1)), X3(-1,false,X2(586394193,6313416569807298536)))
testless.org.scalatest.exceptions.GeneratorDrivenPropertyCheckFailedException:

i.e. (1,-1) can be in any order and both are acceptable results. The test needs to be re-written to account for this to just compare c's.

anamariavisan commented 2 weeks ago

Hello! I tried to update a service to Databricks 14.2 and above that uses the sparksql35-scalapb0_11_2.12 dependency and I got the following error:

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.<init>(Ljava/lang/Class;Lorg/apache/spark/sql/types/DataType;Ljava/lang/String;Lscala/collection/Seq;Lscala/collection/Seq;ZZZ)V
    at frameless.TypedEncoder$$anon$1.toCatalyst(TypedEncoder.scala:69)
    at frameless.RecordEncoder.$anonfun$toCatalyst$2(RecordEncoder.scala:155)
    at scala.collection.immutable.List.map(List.scala:293)
    at frameless.RecordEncoder.toCatalyst(RecordEncoder.scala:153)
    at frameless.TypedExpressionEncoder$.apply(TypedExpressionEncoder.scala:28)
    at scalapb.spark.Implicits.typedEncoderToEncoder(TypedEncoders.scala:119)
    at scalapb.spark.Implicits.typedEncoderToEncoder$(TypedEncoders.scala:116)
    at scalapb.spark.Implicits$.typedEncoderToEncoder(TypedEncoders.scala:122)

This doesn't happen locally. I forked this repo https://github.com/thesamet/sparksql-scalapb-test/tree/master to see if the problem is related to the Databricks environment. The code can be found here: https://github.com/anamariavisan/sparksql-scalapb-test. To build the app I ran these commands:

curl -s "https://get.sdkman.io" | bash
sdk install java 11.0.24-zulu
sdk install sbt 1.6.2
sbt assembly

And to test it locally:

sdk install spark 3.5.0

spark-submit \
  --jars . \
  --class myexample.RunDemo \
  target/scala-2.12/sparksql-scalapb-test-assembly-1.0.0.jar

To test it in Databricks, I created a job and I uploaded the library target/scala-2.12/sparksql-scalapb-test-assembly-1.0.0.jar with the main class being myexample.RunDemo. I submitted the job locally and it worked, but in Databricks 14.2 and above, it failed with:

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.<init>(Ljava/lang/Class;Lorg/apache/spark/sql/types/DataType;Ljava/lang/String;Lscala/collection/Seq;Lscala/collection/Seq;ZZZ)V
    at scalapb.spark.ToCatalystHelpers.fieldToCatalyst(ToCatalystHelpers.scala:165)
    at scalapb.spark.ToCatalystHelpers.fieldToCatalyst$(ToCatalystHelpers.scala:107)
    at scalapb.spark.ProtoSQL$$anon$1$$anon$2.fieldToCatalyst(ProtoSQL.scala:84)
    at scalapb.spark.ToCatalystHelpers.$anonfun$messageToCatalyst$2(ToCatalystHelpers.scala:39)

I searched how to fix it and I found these issues that describe the same problem:

Questions:

  1. When will a new version of frameless that contains a fix for StaticInvoke be released?
  2. Will the shim library be the long term solution?
chris-twiner commented 2 weeks ago

Hello! I tried to update a service to Databricks 14.2 and above that uses the sparksql35-scalapb0_11_2.12 dependency and I got the following error: ...

  1. When will a new version of frameless that contains a fix for StaticInvoke be released?
  2. Will the shim library be the long term solution?

java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.(Ljava/lang/Class;Lorg/apache/spark/sql/types/DataType;Ljava/lang/String;Lscala/collection/Seq;Lscala/collection/Seq;ZZZ)V at scalapb.spark.ToCatalystHelpers.fieldToCatalyst(ToCatalystHelpers.scala:165) at scalapb.spark.ToCatalystHelpers.fieldToCatalyst$(ToCatalystHelpers.scala:107) at scalapb.spark.ProtoSQL$$anon$1$$anon$2.fieldToCatalyst(ProtoSQL.scala:84)

This stack is directly in the scalapb code however, it too would need patching, it could also use the same approach as sparkutils frameless (via compiling against shim and swapping runtimes oss vs dbr 14.2/3). fyi the 14.3 and 15.4 lts' have been successfully tested via sparkutils testless (using shim and sparkutils frameless).

(edit to put in which stack in scalapb.spark I'm referring to).