Kotlin / kotlin-spark-api

This projects gives Kotlin bindings and several extensions for Apache Spark. We are looking to have this as a part of Apache Spark 3.x
Apache License 2.0
456 stars 34 forks source link

Data class serialization fails with `v1.0.3` and Spark `3.2.0` #141

Closed devictr closed 2 years ago

devictr commented 2 years ago

Hello,

It seems like there is an issue with the above-mentioned versions of Spark and kotlin-spark-api. When trying to convert a Dataset<Row> to Dataset<MyDataClass> using the as method, I get the following error

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.objects.Invoke.<init>(Lorg/apache/spark/sql/catalyst/expressions/Expression;Ljava/lang/String;Lorg/apache/spark/sql/types/DataType;Lscala/collection/Seq;Lscala/collection/Seq;ZZ)V
    at org.apache.spark.sql.KotlinReflection$.$anonfun$serializerFor$16(KotlinReflection.scala:759)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at [scala.collection.TraversableLike.map](http://scala.collection.traversablelike.map/)(TraversableLike.scala:286)
    at [scala.collection.TraversableLike.map](http://scala.collection.traversablelike.map/)$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$[ofRef.map](http://ofref.map/)(ArrayOps.scala:198)
    at org.apache.spark.sql.KotlinReflection$.$anonfun$serializerFor$1(KotlinReflection.scala:749)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.KotlinReflection.cleanUpReflectionObjects(KotlinReflection.scala:1013)
    at org.apache.spark.sql.KotlinReflection.cleanUpReflectionObjects$(KotlinReflection.scala:1012)
    at org.apache.spark.sql.KotlinReflection$.cleanUpReflectionObjects(KotlinReflection.scala:47)
    at org.apache.spark.sql.KotlinReflection$.serializerFor(KotlinReflection.scala:591)
    at org.apache.spark.sql.KotlinReflection$.serializerFor(KotlinReflection.scala:578)
    at org.apache.spark.sql.KotlinReflection.serializerFor(KotlinReflection.scala)
    at org.jetbrains.kotlinx.spark.api.ApiV1Kt.kotlinClassEncoder(ApiV1.kt:183)
    at org.jetbrains.kotlinx.spark.api.ApiV1Kt.generateEncoder(ApiV1.kt:170)

When using the latest code from the spark-3.2 branch, things are working fine, so I'm assuming that you have already found this issue and fixed it. Would it be possible to cut a release containing that fix? Sorry I haven't had the time to submit a failing test as an example, but since the last release it seems like you have added quite a few, so this might be already covered. Please let me know if you think it would be valuable to add a test for that particular case.

Thanks!

Jolanrensen commented 2 years ago

NoSuchMethodError sounds to me like a Scala/Spark version mismatch within your (or our) project. Especially the Scala version must match exactly since we use Scala reflection functions. We are working hard on the next (big) release so it's a bit difficult to release a "quick fix" atm. Do let me know if there's something "special" about your data class that we don't yet cover in a test case if it's something more serious than a version mismatch! :)

mlin commented 2 years ago

FWIW I got the same stack trace upon upgrading from v1.0.2 to v1.0.3 (and spark from v3.1.x to v3.2.1). I'm not using as, but am doing various groupByKey & map operations starting from a Dataset<String>. Naively reading the stack trace, it seems to be trouble finding a particular method signature in the Catalyst API?

I've not yet dug in further but will try to when I have a chance.

devictr commented 2 years ago

Hey @Jolanrensen, Do you know when the next release will be out 🙂 ? We're looking forward to using it!

Jolanrensen commented 2 years ago

@devictr Very soon! We expect to be able to release version 1.1.0 in about a week.

Jolanrensen commented 2 years ago

@devictr https://github.com/Kotlin/kotlin-spark-api/releases/tag/3.2-1.1.0 Took a bit longer with the blogpost and jupyter and all, but it's here :)

devictr commented 2 years ago

Amazing! Thank you :)

mlin commented 2 years ago

Update...my app is working nicely with kotlin-spark-api v1.1.0 and spark v3.2.1 (thanks! :sunglasses:). However I just tried to deploy it onto an existing spark v3.2.0 cluster and ran into a similar error, which also reproduces if I downgrade my local environment from spark v3.2.1 to v3.2.0.

Obviously I'm going to inquire about upgrading the spark version on the aforementioned production cluster; but I thought I'd comment here because I'm surprised the "patch" version makes such a critical difference. Hopefully it's something idiosyncratic with spark v3.2.0, as opposed to kotlin-spark-api being generally tied to an exact patch version of spark?

Exception in thread "main" java.lang.NoSuchMethodError: 'boolean org.apache.spark.sql.catalyst.expressions.objects.Invoke$.apply$default$8()'
        at org.apache.spark.sql.KotlinReflection$.$anonfun$serializerFor$16(KotlinReflection.scala:946)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
        at org.apache.spark.sql.KotlinReflection$.$anonfun$serializerFor$1(KotlinReflection.scala:936)
        at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
        at org.apache.spark.sql.KotlinReflection.cleanUpReflectionObjects(KotlinReflection.scala:1310)
        at org.apache.spark.sql.KotlinReflection.cleanUpReflectionObjects$(KotlinReflection.scala:1309)
        at org.apache.spark.sql.KotlinReflection$.cleanUpReflectionObjects(KotlinReflection.scala:51)
        at org.apache.spark.sql.KotlinReflection$.serializerFor(KotlinReflection.scala:680)
        at org.apache.spark.sql.KotlinReflection$.serializerFor(KotlinReflection.scala:667)
        at org.apache.spark.sql.KotlinReflection.serializerFor(KotlinReflection.scala)
        at org.jetbrains.kotlinx.spark.api.EncodingKt.kotlinClassEncoder(Encoding.kt:152)
        at org.jetbrains.kotlinx.spark.api.EncodingKt.generateEncoder(Encoding.kt:134)
Jolanrensen commented 2 years ago

Unfortunately we are indeed tied to patch versions of Spark. This is because we "replace" the core functionality of the catalyst encoding of classes to add support for Kotlin data classes etc. This means we must match on the byte code level. We are working, at the moment, to be able to provide exact versions for all Scala and all Spark versions. Most of the time the code base can namely be identical, but we do need a separate build for it.

Jolanrensen commented 2 years ago

All patch versions will be supported in release 1.2.0! Closing this issue for now :)