This is a work-in-progress overhaul of the core parts of the library to support Spark 3.4+.
Why
Too much has changed in Spark 3.4+ due to Spark decoupling their encoding/decoding system with Spark Connect in mind.
Our previous method was hacky and made us publish exact versions of Spark to maintain bytecode-level compatibility.
We too would like Spark Connect support in the future :)
We need to keep supporting newer Spark versions
What has changed
Removed the :core module entirely. No more spark-package injected code that can break at the bytecode level.
Instead, we just have a :scala-helpers module which doesn't even depend on Spark atm. We just need the VarargUnwrapper class.
Rewrote Encoding from the ground up in pure Kotlin this time. We use the power of Kotlin reflection. I took inspiration of JavaTypeInference and ScalaReflection, which, since 3.4, now build an AgnosticEncoder as a sort-of intermediate step in building an Encoder for the data. This non-implementation-specific encoder can be turned into an actual encoder by passing it to ExpressionEncoder() or into something entirely different (which is what makes Spark Connect possible).
Our KotlinTypeInference.encoderFor implementation is a mix of the Java and Scala types, supporting both Scala/Java lists, primitives, scala Tuples, and most importantly Kotlin data classes.
One downside of having to create an AgnosticEncoder is that we are limited to the AgnosticEncoders offered to us by Spark. We cannot write our own (de)serializers anymore if we want to support Spark Connect. So, in order to support data classes, we need to hijack ProductEncoder.
Deserializing data classes using ProductEncoder works fine, but for serializing we hit a snag. In Scala, case classes have a function with the same name as each property. This assumption is used under the hood, so we need to make sure those functions exist in our data classes.
Plus, later I found this function to do an actual instance check to see if the value is a scala.Product... It's compiler plugin time!
I created a Kotlin compiler plugin which, when applied to your project, can convert:
@Sparkify
data class User(
val name: String,
@ColumnName("test") val age: Int,
)
to
@Sparkify
data class User(
@get:JvmName("name") val name: String,
@get:JvmName("test") @ColumnName("test") val age: Int,
): scala.Product, Serializable {
override fun canEqual(that: Any?): Boolean = that is User
override fun productArity(): Int = 2
override fun productElement(n: Int): Any? =
if (n == 0) this.name
else if (n == 1) this.age
else throw IndexOutOfBoundsException()
}
satisfying both needs from Spark. One downside of this approach is that now you need to annotate each data class you want to encode with @Sparkify (else the column names will be getName and getAge). And you cannot annotate external data classes like Pair :/ So I recommend working with tuples from now on (or make your own @Sparkify Pair).
The compiler plugin (:compiler-plugin) is going to be applicable to your Gradle project by the gradle plugin (:gradle-plugin) with id("org.jetbrains.kotlinx.spark.api") version X or in maven with the <compilerPlugins> tag (probably).
The :kotlin-spark-api and :examples modules also depend on these two plugins for their tests. This is done with a gradle trick that updates bootstrap jars and adds them to the classpath/repository.
Updated to Kotlin 2.0 Beta 5. You should still be able to use 1.9.23 with the compiler plugin, since it just uses IR. It does not require K2.
For Kotlin 2.0, just make sure you set freeCompilerArgs.add("-Xlambdas=class") since Spark cannot serialize lamdas otherwise. If you use the gradle plugin, this is done for you.
TODO
[x] Provide warnings for non-Sparkified classes, especially for Pair/Triple
[x] Java bean as fallback encoder
[x] Jupyter support
[ ] Finalize Jupyter support
[x] UDTs for non-generic Kotlin types like Instant, LocalDateTime etc.
[ ] Spark Connect
[ ] Docs
[ ] Fix RddTest "Work with any number"
[ ] Remove streaming in favor of structured streaming, update examples
added encoding for KotlinX: DatePeriod, DateTimePeriod, Instant, LocalDateTime, and LocalDate, kotlin.time.Duration is sadly not working as it's a value class. (I think that's the reason)
Fixes https://github.com/Kotlin/kotlin-spark-api/issues/195, which is a fun read if you're interested in the process :)
This is a work-in-progress overhaul of the core parts of the library to support Spark 3.4+.
Why
What has changed
:core
module entirely. No more spark-package injected code that can break at the bytecode level.:scala-helpers
module which doesn't even depend on Spark atm. We just need the VarargUnwrapper class.AgnosticEncoder
as a sort-of intermediate step in building anEncoder
for the data. This non-implementation-specific encoder can be turned into an actual encoder by passing it toExpressionEncoder()
or into something entirely different (which is what makes Spark Connect possible).KotlinTypeInference.encoderFor
implementation is a mix of the Java and Scala types, supporting both Scala/Java lists, primitives, scala Tuples, and most importantly Kotlin data classes.AgnosticEncoder
is that we are limited to theAgnosticEncoders
offered to us by Spark. We cannot write our own (de)serializers anymore if we want to support Spark Connect. So, in order to support data classes, we need to hijackProductEncoder
.ProductEncoder
works fine, but for serializing we hit a snag. In Scala, case classes have a function with the same name as each property. This assumption is used under the hood, so we need to make sure those functions exist in our data classes. Plus, later I found this function to do an actual instance check to see if the value is ascala.Product
... It's compiler plugin time!to
satisfying both needs from Spark. One downside of this approach is that now you need to annotate each data class you want to encode with
@Sparkify
(else the column names will begetName
andgetAge
). And you cannot annotate external data classes likePair
:/ So I recommend working with tuples from now on (or make your own@Sparkify Pair
).:compiler-plugin
) is going to be applicable to your Gradle project by the gradle plugin (:gradle-plugin
) withid("org.jetbrains.kotlinx.spark.api") version X
or in maven with the<compilerPlugins>
tag (probably).:kotlin-spark-api
and:examples
modules also depend on these two plugins for their tests. This is done with a gradle trick that updates bootstrap jars and adds them to the classpath/repository.freeCompilerArgs.add("-Xlambdas=class")
since Spark cannot serialize lamdas otherwise. If you use the gradle plugin, this is done for you.TODO
Pair
/Triple
Instant
,LocalDateTime
etc.