Kotlin / kotlin-spark-api

This projects gives Kotlin bindings and several extensions for Apache Spark. We are looking to have this as a part of Apache Spark 3.x
Apache License 2.0
459 stars 35 forks source link

Spark 3.4+ / -Connect support #195

Open Jolanrensen opened 1 year ago

Jolanrensen commented 1 year ago

https://spark.apache.org/news/spark-3-4-0-released.html

Jolanrensen commented 1 year ago

Seems like ScalaReflection.scala has now really deviated too far from KotlinReflection.scala requiring a major overhaul to function. Maybe it's time to try a new approach, such as https://github.com/Kotlin/kotlin-spark-api/issues/178 which would improve maintainability hugely as well as fix most compatibility issues we face.

This will require time and investment I'm not sure I have with my work on DataFrame (especially with the low number of downloads this library currently has). Let me know if you are still an avid user and would like me to invest time into rebuilding the hacky base this library is built upon!

hawkaa commented 1 year ago

Hi. I've just started looking at Spark 3.4 and the first issue we ran into was that we're missing this library. For sure it would be a big win if we could support it. 🙏

zaleslaw commented 1 year ago

Please vote top comment if you need! Or write here like @hawkaa

shane-atg commented 1 year ago

I am very interested in keeping this alive as well.

NatElkins commented 1 year ago

What would be the next step towards moving forward with this?

Jolanrensen commented 1 year ago

What would be the next step towards moving forward with this?

The next step would be to investigate to find a new way to encode Kotlin Data Classes (both at the top-level of DataFrames, as well as inside columns) and keep inferring types to encoders without using KotlinReflection.scala, such that it's compatible with all versions of Scala and Spark 3.X by default. That way we can keep the API relatively the same and improve the maintainability, compatibility, and stability. I'm not sure which mechanisms of Spark we can leverage for this; I was thinking of maybe using UDTs and a compiler plugin/annotation processor to generate the UDT classes... but that won't work for top-level tables. Unfortunately, I'm too occupied with Kotlin DataFrame at the moment, but if someone could provide a proof of concept I'm sure I can provide some help :).

BierDav commented 1 year ago

So if I understood that correctly, we will be able to create spark Dataset from a kotlinx DataFrame? That's exactly what I wanted to do, because working with spark Datasets is not that smooth.

Btw. is there currently a workaround for this?

Jolanrensen commented 1 year ago

So if I understood that correctly, we will be able to create spark Dataset from a kotlinx DataFrame? That's exactly what I wanted to do, because working with spark Datasets is not that smooth.

Btw. is there currently a workaround for this?

No, that's currently not on the roadmap. They're two separate projects, although, we are exploring interop with other databases in DataFrame (https://github.com/Kotlin/dataframe/issues/408) (including Spark).

If you want to convert from Kotlin DataFrame to Spark DataSets, that's actually quite simple:

@DataSchema
data class Name(
    val firstName: String,
    val lastName: String,
)

@DataSchema
data class Person(
    val name: Name,
    val age: Int,
    val city: String?,
    val weight: Int?,
    val isHappy: Boolean,
)

// Kotlin DataFrame
val df: DataFrame<Person> = listOf(
    Person(Name("Alice", "Cooper"), 15, "London", 54, true),
    Person(Name("Bob", "Dylan"), 45, "Dubai", 87, true),
    Person(Name("Charlie", "Daniels"), 20, "Moscow", null, false),
    Person(Name("Charlie", "Chaplin"), 40, "Milan", null, true),
    Person(Name("Bob", "Marley"), 30, "Tokyo", 68, true),
    Person(Name("Alice", "Wolf"), 20, null, 55, false),
    Person(Name("Charlie", "Byrd"), 30, "Moscow", 90, true),
).toDataFrame()

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Person> = df.toList().toDS()
}

Note that df.toList()/df.toListOf<>() only works if the return type is a data class, which is also what's needed for Spark.

If you want to be able to convert any Kotlin DataFrame to a Spark Dataset<Row>, we'll need to convert the schema as well:

/**
 * Converts the DataFrame to a Spark Dataset of Rows using the provided SparkSession and JavaSparkContext.
 *
 * @param spark The SparkSession object to use for creating the DataFrame.
 * @param sc The JavaSparkContext object to use for converting the DataFrame to RDD.
 * @return A Dataset of Rows representing the converted DataFrame.
 */
fun DataFrame<*>.toSpark(spark: SparkSession, sc: JavaSparkContext): Dataset<Row> {
    val rows = sc.toRDD(rows().map(DataRow<*>::toSpark))
    return spark.createDataFrame(rows, schema().toSpark())
}

/**
 * Converts a DataRow to a Spark Row object.
 *
 * @return The converted Spark Row.
 */
fun DataRow<*>.toSpark(): Row =
    RowFactory.create(
        *values().map {
            when (it) {
                is DataRow<*> -> it.toSpark()
                else -> it
            }
        }.toTypedArray()
    )

/**
 * Converts a DataFrameSchema to a Spark StructType.
 *
 * @return The converted Spark StructType.
 */
fun DataFrameSchema.toSpark(): StructType =
    DataTypes.createStructType(
        columns.map { (name, schema) ->
            DataTypes.createStructField(name, schema.toSpark(), schema.nullable)
        }
    )

/**
 * Converts a ColumnSchema object to Spark DataType.
 *
 * @return The Spark DataType corresponding to the given ColumnSchema object.
 * @throws IllegalArgumentException if the column type or kind is unknown.
 */
fun ColumnSchema.toSpark(): DataType =
    when (this) {
        is ColumnSchema.Value -> type.toSpark() ?: error("unknown data type: $type")
        is ColumnSchema.Group -> schema.toSpark()
        is ColumnSchema.Frame -> error("nested dataframes are not supported")
        else -> error("unknown column kind: $this")
    }

/**
 * Returns the corresponding Spark DataType for a given Kotlin type.
 *
 * @return The Spark DataType that corresponds to the Kotlin type, or null if no matching DataType is found.
 */
fun KType.toSpark(): DataType? = when(this) {
    typeOf<Byte>(), typeOf<Byte?>() -> DataTypes.ByteType
    typeOf<Short>(), typeOf<Short?>() -> DataTypes.ShortType
    typeOf<Int>(), typeOf<Int?>() -> DataTypes.IntegerType
    typeOf<Long>(), typeOf<Long?>() -> DataTypes.LongType
    typeOf<Boolean>(), typeOf<Boolean?>() -> DataTypes.BooleanType
    typeOf<Float>(), typeOf<Float?>() -> DataTypes.FloatType
    typeOf<Double>(), typeOf<Double?>() -> DataTypes.DoubleType
    typeOf<String>(), typeOf<String?>() -> DataTypes.StringType
    typeOf<LocalDate>(), typeOf<LocalDate?>() -> DataTypes.DateType
    typeOf<Date>(), typeOf<Date?>() -> DataTypes.DateType
    typeOf<Timestamp>(), typeOf<Timestamp?>() -> DataTypes.TimestampType
    typeOf<Instant>(), typeOf<Instant?>() -> DataTypes.TimestampType
    typeOf<ByteArray>(), typeOf<ByteArray?>() -> DataTypes.BinaryType
    typeOf<Decimal>(), typeOf<Decimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigDecimal>(), typeOf<BigDecimal?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<BigInteger>(), typeOf<BigInteger?>() -> DecimalType.SYSTEM_DEFAULT()
    typeOf<CalendarInterval>(), typeOf<CalendarInterval?>() -> DataTypes.CalendarIntervalType
    else -> null
}

withSpark {
    // Spark Dataset
    val sparkDs: DataSet<Row> = df.toSpark(spark, sc)
}

Edit: for conversion the other way around, check the Wiki: https://github.com/Kotlin/kotlin-spark-api/wiki/Kotlin-DataFrame-interoperability

NatElkins commented 1 year ago

@Jolanrensen Can you explain a little bit more about what ScalaReflection.scala and KotlinReflection.scala do, what they're for, and why the latter is a blocker to Spark 3.4 support? And what some of the considerations about using UDTs as a replacement might be?

Thank you!

Jolanrensen commented 1 year ago

@Jolanrensen Can you explain a little bit more about what ScalaReflection.scala and KotlinReflection.scala do, what they're for, and why the latter is a blocker to Spark 3.4 support? And what some of the considerations about using UDTs as a replacement might be?

Thank you!

Sure! But I gotta give a warning. I'm not the original author of the patch, just the maintainer of the rest of the library, so this will be my best understanding of what's going on.

One of the biggest features of the Kotlin Spark API is the automatic recognizing and encoding of types as Datasets. Without this, encoders would need to be given explicitly, as is the case for the Java API of Spark. This is the difference between:

spark.createDataset(listOf(1, 2, 3), Encoders.INT())
// and
listOf(1, 2, 3, 4, 5).toDS()

or even

data class Person1 @JvmOverloads constructor(
    var name: String? = null,
    var age: Int? = null,
) : Serializable

spark.createDataset(listOf(Person1("A", 1)), Encoders.bean(Person1::javaClass))

// and
data class Person2(
    val name: String,
    val age: Int,
)
listOf(Person2("A", 1)).toDS()

To do this, we need to automatically generate an encoder based on the typing information provided by the reified type parameter in toDS<>() and the encoder<>() function.

If you follow the generateEncoder function in the same file you can see we'll attempt to create an Encoder from the given KType. This can be either a predefined encoder (from ENCODERS) or some data class, Map, iterable, Array, UDT or anything we can generate an Encoder for on the fly.

An (Expression)Encoder needs two things: An Expression to serialize an object and one to deserialize it. Functions to create these serializers and deserializers exist in ScalaReflection.scala, but they of course can only handle types supported by Spark itself. We want to be able to create (de)serializers for Kotlin Data classes, plus Tuples inside data classes, arrays inside tuples inside data classes inside maps, etc. We need the logic in this file but somehow inject extra functionality. At least, that's what @asm0dey likely thought when making it :). The downside is that we keep bumping into spark-internal functions if we want to call it from the Kotlin side (not even starting about the incompatibilities between Kotlin and Scala). So, a new module was created using the same org.apache.spark.sql package (to be able to call Spark internal functions) and the code from ScalaReflection.scala was copied to KotlinReflection.scala modifying its behavior for Kotlin data class support.

To help with the de(serializing) of Kotlin-specific stuff, a schema/predefinedDt argument was added such that the schema: DataType of a certain type can be generated in Kotlin instead of Scala.

Well, this worked, but, having a large piece of copied internal code in your codebase is bound to cause some issues over time. And so it did...

After each major release of Spark it was a large hassle to keep compatibility between KotlinReflection.scala and the rest of Spark. Especially, since internal calls can change between minor releases and break on a bytecode level. This is why we have so many releases (one for each Scala12/13 and minor Spark combo). Plus, if Spark adds a new feature to 3.Z, well, they can just update their ScalaReflection.scala file. We on the other hand need to support 3.X, 3.Y, ánd 3.Z with just one codebase (which we currently do with a preprocessor, but it's not a walk in the park).

Spark 3.4 was the straw that broke the camel's back. ScalaReflection.scala changed file location and a lot was changed between the last version. It was a good wake-up call to show that this wasn't the way forward. A simple preprocessor cannot ensure compatibility between these versions anymore and who knows what else will break in Spark 3.5 or with Scala 3 even.

We need a new way to encode Kotlin Data Classes while maintaining the current flexibility but without relying on internal Spark code. Spark version bumps (even major ones) need to be doable with minor preprocessor changes. (One version of the API for ALL spark versions is unrealistic, but one for, say 3.0, 3.1, etc. will probably be fine)

There are probably several ways to do this:

Hopefully, this has given you or anyone interested enough inspiration and explanation to give it a try :) If someone can provide a proof-of-concept, I'd be happy to explore it further.

asm0dey commented 1 year ago

I'm happy to provide any additional guidance if needed as an author of the original implementation (but probably Jolan knows it better than me already :)

On Tue, 19 Sept 2023, 20:13 Jolan Rensen, @.***> wrote:

@Jolanrensen https://github.com/Jolanrensen Can you explain a little bit more about what ScalaReflection.scala and KotlinReflection.scala do, what they're for, and why the latter is a blocker to Spark 3.4 support? And what some of the considerations about using UDTs as a replacement might be?

Thank you!

Sure! But I gotta give a warning. I'm not the original author of the patch, just the maintainer of the rest of the library, so this will be my best understanding of what's going on.

One of the biggest features of the Kotlin Spark API is the automatic recognizing and encoding of types as Datasets. Without this, encoders would need to be given explicitly, as is the case for the Java API of Spark https://spark.apache.org/docs/latest/sql-getting-started.html#creating-datasets. This is the difference between:

spark.createDataset(listOf(1, 2, 3), Encoders.INT())// andlistOf(1, 2, 3, 4, 5).toDS()

or even

data class Person1 @JvmOverloads constructor( var name: String? = null, var age: Int? = null, ) : Serializable

spark.createDataset(listOf(Person1("A", 1)), Encoders.bean(Person1::javaClass)) // anddata class Person2( val name: String, val age: Int, )listOf(Person2("A", 1)).toDS()

To do this, we need to automatically generate an encoder based on the typing information provided by the reified type parameter in toDS<>() https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt#L64 and the encoder<>() https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt#L137 function.

If you follow the generateEncoder function in the same file you can see we'll attempt to create an Encoder from the given KType. This can be either a predefined encoder (from ENCODERS https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt#L78) or some data class, Map, iterable, Array, UDT or anything we can generate an Encoder for on the fly.

An (Expression)Encoder needs two things: An Expression to serialize an object and one to deserialize it. Functions to create these serializers and deserializers exist in ScalaReflection.scala https://github.com/apache/spark/blob/2f3e4e36017d16d67086fd4ecaf39636a2fb4b7c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L166C4-L166C4, but they of course can only handle types supported by Spark itself. We want to be able to create (de)serializers for Kotlin Data classes, plus Tuples inside data classes, arrays inside tuples inside data classes inside maps, etc. We need the logic in this file but somehow inject extra functionality. At least, that's what @asm0dey https://github.com/asm0dey likely thought when making it :). The downside is that we keep bumping into spark-internal functions if we want to call it from the Kotlin side (not even starting about the incompatibilities between Kotlin and Scala). So, a new module https://github.com/Kotlin/kotlin-spark-api/tree/release/core/src/main/scala was created using the same org.apache.spark.sql package (to be able to call Spark internal functions) and the code from ScalaReflection.scala was copied to KotlinReflection.scala https://github.com/Kotlin/kotlin-spark-api/blob/release/core/src/main/scala/org/apache/spark/sql/KotlinReflection.scala modifying its behavior for Kotlin data class support.

To help with the de(serializing) of Kotlin-specific stuff, a schema/ predefinedDt argument was added such that the schema: DataType of a certain type can be generated in Kotlin https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt#L186 instead of Scala.

Well, this worked, but, having a large piece of copied internal code in your codebase is bound to cause some issues over time. And so it did...

After each major release of Spark it was a large hassle to keep compatibility between KotlinReflection.scala and the rest of Spark. Especially, since internal calls can change between minor releases and break on a bytecode level. This is why we have so many releases (one for each Scala12/13 and minor Spark combo). Plus, if Spark adds a new feature to 3.Z, well, they can just update their ScalaReflection.scala file. We on the other hand need to support 3.X, 3.Y, ánd 3.Z with just one codebase (which we currently do with a preprocessor https://github.com/raydac/java-comment-preprocessor, but it's not a walk in the park).

Spark 3.4 was the straw that broke the camel's back. ScalaReflection.scala changed file location https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala and a lot was changed between the last version. It was a good wake-up call to show that this wasn't the way forward. A simple preprocessor cannot ensure compatibility between these versions anymore and who knows what else will break in Spark 3.5 or with Scala 3 even.

We need a new way to encode Kotlin Data Classes while maintaining the current flexibility but without relying on internal Spark code. Spark version bumps (even major ones) need to be doable with minor preprocessor changes https://github.com/Kotlin/kotlin-spark-api/blob/470bcf4dd6a0318a1cd0e947670f921f8f62969e/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt#L93. (One version of the API for ALL spark versions is unrealistic, but one for, say 3.0, 3.1, etc. will probably be fine)

There are probably several ways to do this:

  • UDT: Make data classes automatically a user-defined-type, either with an @Annotation or using a Kotlin (2.0) compiler plugin. Downside: UDTs (afaik) only allow user defined objects serialized inside others, such as Tuples, not as a top-level table-like object. This could be possible, but I don't know how. Second, I don't know how (de)serializing of other known JVM types (such as tuples, Seqs, arrays etc) inside data classes would work.
  • Java Bean: Make a compiler plugin that will convert all data classes to something like Person1 at compile time. Top-level should now work, but again I'm not sure about nested types.
  • Other clever Kotlinx reflection + Spark magic: Somehow be able to create an Encoder for any data class using reflection without the use of Spark internals or a compiler plugin. This would be the holy grail, but again, no clue how to do that.
  • Maybe even another way. I'm no Spark expert by any means. Maybe this gitbook https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExpressionEncoder.html could provide any inspiration.

Hopefully, this has given you or anyone interested enough inspiration and explanation to give it a try :) If someone can provide a proof-of-concept, I'd be happy to explore it further.

— Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlin-spark-api/issues/195#issuecomment-1726253168, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAJ4XAURX5OSHEINJ4EIVM3X3HOFLANCNFSM6AAAAAAW7OYH3U . You are receiving this because you were mentioned.Message ID: @.***>

NatElkins commented 1 year ago

Thank you for the comprehensive answer! I will ponder this for a bit.

I have a strong interest in using Kotlin and Spark together for a work project. It doesn't seem like there is a hard blocker per se (I can always just use the Java API), just that some of the nice-to-haves of this library may not be available unless I contribute a fix.

asm0dey commented 10 months ago

@Jolanrensen we should probably take a look at the connect API: https://spark.apache.org/docs/latest/spark-connect-overview.html

Jolanrensen commented 10 months ago

@Jolanrensen we should probably take a look at the connect API: https://spark.apache.org/docs/latest/spark-connect-overview.html

Allowing the Spark driver and code to use different versions from the application code might indeed solve a lot of problems for us! However, we'd still need a completely new way to encode data classes.

asm0dey commented 10 months ago

I can imagine that we'll generate pojos from used data classes. in the compile time, I know you are doing something similar in dataframe, but I don't have any idea how to implement it :) And POJOs are natively supported by Spark

Jolanrensen commented 10 months ago

@asm0dey A compiler plugin could do that :)

gregfriis commented 8 months ago

Hi, do you have any estimate on when (or if) 3.4+ will be supported?

Jolanrensen commented 8 months ago

@gregfriis I'm sorry, no, we currently don't have the resources to figure that out. What could help is if someone from the community could provide a proof of concept solution. That way we could better weigh the time/cost to properly support it.

Jolanrensen commented 7 months ago

Small weekend/hobby update regarding the issue:

I tried Spark Connect but locally on my machine I couldn't get it to work reliably yet. Plus it requires running Spark locally with some special script, so for now, that's not ideal.

I did experiment with the potential compiler plugin route and I do have a hunch that it might be possible :). It does require some special configuration and modules, but it should be doable.

The basic angle is: Converting annotated Kotlin data classes to something Spark sees as a Scala case class. This can automatically provide us with all supported (and even nested) types without having to traverse the classes ourselves.

In theory, this is not that difficult, but it comes with a few gotchas:

To recreate my experiment, we need:

Scala module with type retrieval

Stolen from KotlinReflection.scala, this piece of code can create a Scala Type from a Java class, something we need to make an encoder using ScalaReflection.encoderFor() later on:

object TypeRetrieval {
  val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe

  import universe._

  def getType[T](clazz: Class[T]): universe.Type = {
    clazz match {
      // not sure about this line, without it, no spark dependencies are needed
      case _ if clazz == classOf[Array[Byte]] => localTypeOf[Array[Byte]]
      case _ => {
        val mir = runtimeMirror(clazz.getClassLoader)
        mir.classSymbol(clazz).toType
      }
    }
  }
}

Kotlin module depending on the Scala module

This little helper function can create a Spark ExpressionEncoder for any type or class. This would be similar to calling Encoders.product[T] or ExpressionEncoder[T] from Scala.

inline fun <reified T : Any> encoderFor(): Encoder<T> = encoderFor(T::class.java)
fun <T> encoderFor(klass: Class<T>): Encoder<T> =
    ExpressionEncoder.apply(
        ScalaReflection.encoderFor(
            TypeRetrieval.getType(klass),
            false
        ) as AgnosticEncoder<T>
    )

Next, to limit the amount of generated code, we can create a little bridge from data classes to Product, implementing some stuff with reflection. This may be omitted if it seems to cause too much overhead, as it can also be generated by a compiler plugin right in the data class.

abstract class DataClassProduct(private val klass: KClass<*>) : Product {

    override fun productPrefix(): String = klass.simpleName!!

    private val params
        get() = klass
            .primaryConstructor!!
            .parameters
            .withIndex()
            .associate { (i, it) -> i to it }

    override fun canEqual(that: Any?): Boolean = that!!::class == klass
    override fun productElement(n: Int): Any = params[n]?.let { param ->
        klass.declaredMemberProperties
            .firstOrNull { it.name == param.name }
            ?.call(this)
    } ?: throw IndexOutOfBoundsException(n.toString())

    override fun productArity(): Int = params.size

    override fun productElementNames(): Iterator<String> = CollectionConverters.asScala(
        iterator {
            for (param in params.values) {
                yield(param.name!!)
            }
        }
    )
}

Writing actual Kotlin data classes

NOTE: we need the -parameters tag for compiling it, so in build.gradle.kts:

kotlin {
    jvmToolchain(8)
    compilerOptions {
        javaParameters = true
    }
}

Let's say we now want to encode a simple data class. We could just write

@SomeSparkAnnotation
data class Person(
    val name: String,
    val age: Int,
    val hobbies: List<String>,
    val address: Pair<String, Int>,
)

What the compiler plugin would produce

and then the compiler plugin could convert this to:

data class Person(
    val name: String,
    val age: Int,
    val hobbies: scala.collection.immutable.List<String>, // converting java.util.List to scala.collection.immutable.List
    val address: scala.Tuple2<String, Int>, // converting kotlin.Pair to scala.Tuple2
): DataClassProduct(Person::class) {
    // accessors for Spark
    fun name() = this.name
    fun age() = this.age
    fun hobbies() = this.hobbies
    fun address() = this.address

    companion object {
        // so we can still create it normally from kotlin
        operator fun invoke(
            name: String,
            age: Int,
            hobbies: List<String>,
            address: Pair<String, Int>,
        ): Person =
            Person(
                name = name,
                age = age,
                hobbies = CollectionConverters.asScala(hobbies).toList(),
                address = address.toTuple() // from scalaTuplesInKotlin
            )
    }
}

Running this with Spark will work correctly:

val test = Person(
    name = "Jolan",
    age = 22,
    hobbies = listOf("Programming", "Gaming"),
    address = "Kerkstraat" to 1
)
spark.createDataset(listOf(test), encoderFor<Person>()).show()
// +-----+---+--------------------+---------------+
// | name|age|             hobbies|        address|
// +-----+---+--------------------+---------------+
// |Jolan| 22|[Programming, Gam...|{Kerkstraat, 1}|
// +-----+---+--------------------+---------------+

But, we might need something more advanced, because calling test.hobbies will now result in a Scala list again :/

It does have some promise though, as nested data classes like these even work :), plus we don't need to hack into the Spark source code anymore.

How further

We could try to generate a Person_: DataClassProduct that we can auto-convert to and from by also autogenerating and registering a UDT for each data class. This UDT would then use the encoder of Person_ for all the typing information and handle the conversion from scala-kotlin types. This needs some further experimenting.

(I do have some issues calling the deserializer on an encoded row with the ExpressionEncoder, both in Scala and Kotlin, strange...) (Edit: fixed by creating (de)serializer like:

private val encoder = encoderFor(Person_::class.java)
private val serializer = encoder.createSerializer()
private val deserializer = encoder.resolveAndBind(DataTypeUtils.toAttributes(encoder.schema()) as Seq<Attribute>, `SimpleAnalyzer$`.`MODULE$`).createDeserializer()

)

And of course, actually build a compiler plugin. This is tricky and requires Kotlin 2.0.

asm0dey commented 7 months ago

Hey, this is just awesome research, thank you!

I think it makes more sense to compile to Java POJOs rather than to case classes. POJOs are natively supported by JavaReflection and should be significantly easier to generate. Moreover, it might be easy to generate syntetic (like Person_) POJOs and convert them to data classes automatically

Jolanrensen commented 7 months ago

@asm0dey you mean using Encoders.bean()? That can indeed be done relatively easily, also generated, but this limits us in other ways again: nullability/default arguments are needed, and, for instance, nested tuples are not supported anymore:

data class AddressJava @JvmOverloads constructor(
    var street: String = "",
    var city: String = "",
) : Serializable

data class PersonJava @JvmOverloads constructor(
    var name: String = "",
    var age: Int = -1,
    var tupleTest: Tuple2<AddressJava, Int>? = null,
    var listTest: List<AddressJava> = emptyList(),
) : Serializable
...
val data = listOf(
    PersonJava("Jolan", 25, Tuple2(AddressJava("Street", "City"), 5), listOf(AddressJava("Street", "City"))),
)

val encoder = Encoders.bean(PersonJava::class.java)
val df = spark.createDataset(data, encoder)
df.show()
// +---+----------------+-----+---------+
// |age|        listTest| name|tupleTest|
// +---+----------------+-----+---------+
// | 25|[{City, Street}]|Jolan|       {}|
// +---+----------------+-----+---------+

And calling df.head() breaks everything:

Exception in thread "main" org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.SparkRuntimeException: Couldn't find a valid constructor on class scala.Tuple2.

This is something that is possible in the current version of the Kotlin Spark API, so it seems a bit harsh to break that.

We could extend JavaTypeInference, but we already tried to make our own KotlinReflection.encoderFor(class) before...

asm0dey commented 7 months ago

Huh, right, I forgot about the tuples support. Is it possible and necessary to support default argument values? It seems that backend should not care at all because at runtime they are already known

Jolanrensen commented 7 months ago

Java bean support requires an empty constructor + getters/setters, so yeah :/. That's what @JvmOverloads achieves.

Actually, we can do it with lateinit var's

asm0dey commented 7 months ago

no-args plugin already adds a noargs constructor to a data class without default arguments, see here: https://kotlinlang.org/docs/no-arg-plugin.html

You can even force it to work on your custom annotation. Doesn't solve the nested tuples feature though

Jolanrensen commented 6 months ago

2 updates:

First: We can successfully define a UDT for a class (en/de)coding it using the ExpressionEncoder of a different class. No generics are supported afaik though. So we cannot make a generic UDT for java.util.Lists which parses it using an IterableEncoder. (Probably explains why that hasn't been done before)

The downside of this approach is: no real column creation:

root
 |-- value: persondataclassudt (nullable = true)

Second: I tweaked the data class in such a way that it can work perfectly, both as a case class from Spark/Scala, as well as a data class from Kotlin :)

Given:

@SomeAnnotation
data class Address(
    val streetAndNumber: Pair<String, Int>,
    val city: String,
)

We generate:

class Address( // main constructor with scala names/types
    streetAndNumber: Tuple2<String, Int>,
    city: String,
) : Product {
    // secondary in companion object with kotlin names/types
    // Mainly, so Spark doesn't see this constructor 
    companion object {
        operator fun invoke(
            streetAndNumber: Pair<String, Int>,
            city: String,
        ): Address = Address(
            streetAndNumber = streetAndNumber.toTuple(),
            city = city,
        )
    }

    private val scalaStreetAndNumber = streetAndNumber
    private val scalaCity = city

    // For Scala
    fun streetAndNumber() = scalaStreetAndNumber
    fun city() = scalaCity

    // For Kotlin
    val streetAndNumber = scalaStreetAndNumber.toPair()
    val city = scalaCity

    // Product functions
    override fun canEqual(that: Any?): Boolean = that is Address
    override fun productElement(n: Int): Any = when (n) {
        0 -> scalaStreetAndNumber
        1 -> scalaCity
        else -> throw IndexOutOfBoundsException(n.toString())
    }
    override fun productArity(): Int = 2

    // data class functions
    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (other !is Address) return false

        if (streetAndNumber != other.streetAndNumber) return false
        if (city != other.city) return false

        return true
    }
    override fun hashCode(): Int {
        var result = streetAndNumber.hashCode()
        result = 31 * result + city.hashCode()
        return result
    }
    fun copy(
        streetAndNumber: Pair<String, Int> = this.streetAndNumber,
        city: String = this.city,
    ): Address = Address(
        streetAndNumber = streetAndNumber,
        city = city,
    )
    override fun toString(): String = "Address(streetAndNumber=$streetAndNumber, city='$city')"
}

Even when nested, the ExpressionEncoder is perfectly happy with this class. It can serialize and deserialize it and nested types like this and other Scala case classes (like tuples) work too. Plus, from Kotlin we can do anything with it like it's still the original data class.

The plugin does need to create these converters:

The rest should be good to go

Edit: Okay, nullability detection is not really there:

encoderFor<Address>().schema().printTreeString()
// root
// |-- streetAndNumber: struct (nullable = true)
// |    |-- _1: string (nullable = true)
// |    |-- _2: integer (nullable = true)
// |-- city: string (nullable = true)

I'm not sure how to convey these to Scala correctly. In JavaTypeInference, they check for @Nonnull annotations, but in ScalaReflection they don't..

asm0dey commented 6 months ago

Wow, man, this sounds stunning! How?

Jolanrensen commented 6 months ago

Wow, man, this sounds stunning! How?

Well if it walks like a case class and quacks like a case class XD. But jokes aside, this still requires building a Kotlin 2.0 compiler plugin to rewrite the user-written data classes on the fly, which is no easy feat, but definitely possible.

asm0dey commented 6 months ago

I mean the fact that you did it without released K2 is a big deal! We actually have every right to say that the future versions of Kotlin API for Apache Spark for Spark x require Kotlin 2+

Jolanrensen commented 6 months ago

I mean the fact that you did it without released K2 is a big deal! We actually have every right to say that the future versions of Kotlin API for Apache Spark for Spark x require Kotlin 2+

Define "did it" XD. This is just a proof of concept haha. I have built no plugin yet, just an idea for what the plugin should do for it to work.

asm0dey commented 6 months ago

Ah, I see, misread you then. But awesome concept though!

Jolanrensen commented 6 months ago

Haha, that's okay :) But yeah, I believe something like this could do the trick. In theory, it could even work with spark-connect (if that ever works reliably), since we don't target or modify anything specify to spark. We would still be scala-major-dependent, which is okay I think

Jolanrensen commented 6 months ago

come to think of it, we could also use the java bean serializer if, using the plugin, we could replace all scala tuples with our temporary @NoArg data class tuples.

Depends on what is more stable :)

Jolanrensen commented 6 months ago

Yep, I think this is the way to go. Types and nullability are detected correctly and consistently now with annotations and the java bean encoder. For unsupported data/case classes we need to generate a temporary spark-ified version, which I'll show below for Pair, but also for Tuples they would have to be generated on the fly. But compared to the Scala variant a lot less conversions are needed! Which is a huge bonus.

Given

@Sparkify
data class Address2(
    val streetAndNumber: Pair<String, Int?>,
    var city: String,
)

we should generate

@NoArg // adding a 0-arg constructor
class Address2( // We can keep the constructor for Kotlin, the serializer doesn't use it
    streetAndNumber: Pair<String, Int?>,
    city: String,
) : Serializable {

    // Property accessors for Kotlin using a new JvmName as to not clash with the getter/setter functions
    @get:JvmName("streetAndNumber")
    @set:JvmName("streetAndNumber")
    var streetAndNumber: Pair<String, Int?> = streetAndNumber
        private set // private set if it was a val before

    @get:JvmName("city")
    @set:JvmName("city")
    var city: String = city
        public set // public set if it was a var before

    // Getters and setters for Java bean creation. The getters define the types and names of the properties.
    // These can also do conversion from/to custom types :)
    // We could probably choose to leave out these getX()/setX() functions for types that don't need to be changed
    // But for now, we'll just add them for all properties

    @Nonnull // Annotated with Nonnull for proper nullability detection
    @Deprecated("", level = DeprecationLevel.ERROR) // deprecation to prevent usage in Kotlin
    fun getCity(): String = city
    @Deprecated("", level = DeprecationLevel.ERROR)
    fun setCity(value: String) { this.city = value }

    // Since we cannot decode to kotlin.Pair, we need a custom wrapper for the Pair<String, Int>
    // Of course, annotated with @get:Nonnull/Nullable
    // This will also need to be generated each time a Triple, Tuple4, etc. is used

    @NoArg class FakePair_streetAndNumber(@get:Nonnull var first: String, @get:Nullable var second: Int?) : Serializable
    private fun Pair<String, Int?>.toFakePair_streetAndNumber() = FakePair_streetAndNumber(this.first, this.second)
    private fun FakePair_streetAndNumber.toPair() = Pair(this.first, this.second)

    @Nonnull
    @Deprecated("", level = DeprecationLevel.ERROR)
    fun getStreetAndNumber(): FakePair_streetAndNumber = this.streetAndNumber.toFakePair_streetAndNumber() // conversion
    @Deprecated("", level = DeprecationLevel.ERROR)
    fun setStreetAndNumber(value: FakePair_streetAndNumber) { this.streetAndNumber = value.toPair() }

    // Finally, just copying over the data class functions

    fun copy(
        streetAndNumber: Pair<String, Int?> = this.streetAndNumber,
        city: String = this.city,
    ): Address2 = Address2(
        streetAndNumber = streetAndNumber,
        city = city,
    )
    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (other !is Address2) return false

        if (streetAndNumber != other.streetAndNumber) return false
        if (city != other.city) return false

        return true
    }
    override fun hashCode(): Int {
        var result = streetAndNumber.hashCode()
        result = 31 * result + city.hashCode()
        return result
    }
    override fun toString(): String = "Address(streetAndNumber=$streetAndNumber, city='$city')"
    operator fun component1(): Pair<String, Int?> = this.streetAndNumber
    operator fun component2(): String = this.city
}

With this version of the class we can still do anything with it as if it were a data class:

val addresses = listOf(
    Address2(Pair("street", 1), "city"),
    Address2(Pair("street2", null), "city2").copy(city = "city3"),
)
val (streetEtc: Pair<String, Int?>, city: String) = addresses[0]

But most importantly, we can encode and decode it using the Java bean encoder:

inline fun <reified T : Any> javaEncoderFor(): Encoder<T> = Encoders.bean(T::class.java)

javaEncoderFor<Address2>().schema().printTreeString()
// root
//  |-- city: string (nullable = false)
//  |-- streetAndNumber: struct (nullable = false)
//  |    |-- first: string (nullable = false)
//  |    |-- second: integer (nullable = true)

val ds1 = spark.createDataset(
    addresses,
    javaEncoderFor()
)
ds1.show()
// +-----+---------------+
// | city|streetAndNumber|
// +-----+---------------+
// | city|    {street, 1}|
// |city3|{street2, NULL}|
// +-----+---------------+

println(ds1.head())
// Address(streetAndNumber=(street, 1), city='city')
asm0dey commented 6 months ago

Do you think we should retrospectively update old versions with this implementation too?

asm0dey commented 6 months ago

Also, is it viable and possible to rewrite all the Scala Products to Java Beans? Or does JavaSerializer support Products OOTB?

Jolanrensen commented 6 months ago

Also, is it viable and possible to rewrite all the Scala Products to Java Beans? Or does JavaSerializer support Products OOTB?

No, products aren't supported out of the box. Otherwise, Tuples would work right away. But if a user puts a Product inside one of our data classes we could probably generate a Fake version of it in the data class, like with FakePair_streetAndNumber.

Top-level Scala products/non-sparkified data classes like Pair/Triple might still be an issue though, as we would still have to create an Encoder for those. The Java bean encoder cannot handle them... This would also prevent us from creating a Dataset from listOf(t(SomeDataClass, "something")), which might actually be a blocker :(.

If that is a blocker, I think the only other way is to create our own Kotlin version of JavaTypeInference.scala (but this time not inside the org.apache.spark.sql.catalyst package ;) ). Either in Kotlin or in Scala. Here we would then need to add support for Kotlin data classes and "definedByConstructorParams" classes, like Products. This could increase maintenance cost tho.

asm0dey commented 6 months ago

IIRC the reason why I created it in that package was Spark only could read it from that package :) I would never do it without a serious reason. Also, I'm not sure if you can do it in Kotlin — probably it operates with Scala-specific reflection, but here I might be wrong.

If it's possible to implement with Kotlin and outside Spark packages - we won't be affected by changes in Spark anymore, which will bring a bearable amount of maintenance.

Jolanrensen commented 6 months ago

I'm trying the KotlinTypeInference route rn. Seems that most things work fine with Kotlin reflection since, like here, the actual encoders are somewhere else currently. This means we can add both Scala encoders and Java encoders, as well as a special product-like encoder for data classes. I'm not sure how backwards compatible that is, but it seems like the cleanest custom solution to retrieving encoders for Kotlin.

We cannot go fully custom with our encoders, as only the ones used here are compatible, but hey, we can try

asm0dey commented 6 months ago

It was never or goal to go fully custom, we should support the sensible subset of possibilities. Looking with a fresh eye I start to think that we actually don't need all these Scala intricacies, we need only to support the Kotlin staff and he smart about friendship delegation of everything else.

The most complex part of support - encoders for data classes and recursive generics are already written and probably should be reused.

Jolanrensen commented 6 months ago

Update time :)

I've been trying to make my own mixed version of JavaTypeInference and ScalaReflection in pure Kotlin with success!

image

This will allow us to create a complete replacement of the old encoding system with data classes supported out of the box :). (I did need 1 dirty hack to get the names of the columns right for data classes, but I'll get to that later haha)

One downside of this approach is that it's definitely Spark 3.4+. This is because they switched their encoding system for Spark connect to this. So, instead of defining a (de)serializer for each type, like before, they define intermediate "AgnosticEncoder" objects which later will get (de)serializers generated when building an ExpressionEncoder. This meant I had to use ProductEncoder for Kotlin data classes since they are the only Encoder that works by reading the values and instantiating a new instance of any class by calling the constructor with all arguments at once.

I'll try to make a new branch in this repo with this approach and see if we can replace the encoding with the new system. This should allow us to drop the :core module entirely and be more spark-version-independent (in theory).

asm0dey commented 6 months ago

And also probably significantly reduce of not remove the code preprocessing!

On Sun, 17 Mar 2024, 12:05 Jolan Rensen, @.***> wrote:

Update time :)

I've been trying to make my own mixed version of JavaTypeInference https://github.com/apache/spark/blob/branch-3.5/sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala and ScalaReflection https://github.com/apache/spark/blob/branch-3.5/sql/api/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala in pure Kotlin with success!

image.png (view on web) https://github.com/Kotlin/kotlin-spark-api/assets/17594275/bf366bf7-038c-4950-8f01-e90877e21b3b

This will allow us to create a complete replacement of the old encoding system with data classes supported out of the box :). (I did need 1 dirty hack to get the names of the columns right for data classes, but I'll get to that later haha)

One downside of this approach is that it's definitely Spark 3.4+. This is because they switched their encoding system for Spark connect to this https://github.com/apache/spark/blob/branch-3.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala. So, instead of defining a (de)serializer for each type, like before, they define intermediate "AgnosticEncoder" objects which later will get (de)serializers generated when building an ExpressionEncoder. This meant I had to use ProductEncoder https://github.com/apache/spark/blob/8c6eeb8ab0180368cc60de8b2dbae7457bee5794/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala#L114 for Kotlin data classes since they are the only Encoder that works by reading the values https://github.com/apache/spark/blob/8c6eeb8ab0180368cc60de8b2dbae7457bee5794/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala#L350 and instantiating a new instance of any class by calling the constructor with all arguments at once https://github.com/apache/spark/blob/8c6eeb8ab0180368cc60de8b2dbae7457bee5794/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala#L353 .

I'll try to make a new branch in this repo with this approach and see if we can replace the encoding with the new system. This should allow us to drop the :core module entirely and be more spark-version-independent (in theory).

— Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlin-spark-api/issues/195#issuecomment-2002409949, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAJ4XAV2IOLJK3MMGUHL7VDYYV2IVAVCNFSM6AAAAAAW7OYH3WVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAMBSGQYDSOJUHE . You are receiving this because you were mentioned.Message ID: @.***>

asm0dey commented 6 months ago

AFAIR it should be totally possible to write our own implementation of data class instantiation. I'll try to look at it a bit later

Jolanrensen commented 6 months ago

@asm0dey I thought so too, but if we use AgnosticEncoders (which provides Spark connect compatibility as far as I know), we can only use the encoders given to us. If we want to define our own serializers and deserializers (which should still be possible) we'll have to define them for all types again, like before.

Jolanrensen commented 6 months ago

You can check out https://github.com/Kotlin/kotlin-spark-api/pull/218 to see my progress :). I inserted my new encoder system in the project and updated references across the board. I disabled :core for now, but we might still need it for the vararg UDFs. I remember the Kotlin type system sorta broke down when it had to unwrap those by itself... so yeah. But it should be okay to depend on some Scala library (as long as we name it "utils" or something), as long as it doesn't contain Spark.

All other Spark helpers should be doable in pure Kotlin (as I did with debug() and debugCodegen()).

The encoders are still incomplete, so there are still many failing tests, but we can start experimenting with what does and what does not work on that branch.

Jolanrensen commented 6 months ago

One of the main problems I currently have is with the encoding of data class names using the ProductEncoder. This is because it calls the EncoderField.name() function both for the name of the column, as well as the name of the function for where to retrieve the value. readMethodName is completely ignored.

One way to tackle this is by marking you data classes like:

data class User(@get:JvmName("name") val name: String, @get:JvmName("age") val age: Int)

which sets the JVM function names of getName() and getAge() to name() and age() respectively.

However, this doesn't work for data classes we don't have access to, like Pair and Triple.

My name hack tries to override the name() function, such that only the first time it's called it returns "getAge" and the rest of the time it returns "age". This works for simple data sets (like you see in the picture), but it breaks down when the name() function is called multiple times earlier on. This for instance happens when you try to encode a List<DataClass>.

So yeah... ideas are welcome. You can turn it off by setting KotlinTypeInference.DO_NAME_HACK = false

@asm0dey Maybe a little compiler plugin after all? :) At least for manual user-created data classes

Jolanrensen commented 6 months ago

Update: I managed to create a little compiler plugin that in the IR stage adds @JvmNames to the getters of @Sparkify annotated data classes :D. Just need to create a Gradle plugin around it so I can publish it (locally) and check if it works in combination with Spark. Since it just touches IR it should be compatible both with K1 and K2 :). Also, I did test that I can call the function using Java reflection. That works.

Come to think of it 🤔 I should check whether the @get:JvmName annotation is present already and skip it if so.

Plus, we might be able to add something like @ColumnName("name") (which will just be converted to @get:JvmName("name")) for custom column names :) this annotation would have to be taken into account in the encoding but that's okay.

Finally, I need to take a look at @Transient which we supported before. We could handle it with serializing from a data class, but we cannot instantiate a new data class if we don't have all constructor parameters... (Maybe @Transient requires a default argument to be present (can be warned in FIR) and @JvmOverloads does the rest? Need to test)

Jolanrensen commented 6 months ago

image image

Yesss! it works in combination with spark

Jolanrensen commented 6 months ago

image And with @ColumnName :)

Jolanrensen commented 6 months ago

Most tests are now fixed at: https://github.com/Kotlin/kotlin-spark-api/pull/218 The PR contains the compiler+gradle plugin which can process @Sparkify and @ColumnName annotations. To build the project, both compiler-plugin and gradle-plugin need to be published to mavenLocal.

There's just one place that now fails and that is when returning a @Sparkify data class from a UDF. It's because of this Spark function which does an instance check for the value instead of using the proper deserialize method :/. So, to fix this, we need to make our data classes a scala.Product after all. I'll see what I can do in IR.

Jolanrensen commented 6 months ago

image

They're a hassle to make, but compiler plugins are awesome :) The compiler plugin now converts:

@Sparkify
data class User(
    val name: String,
    @ColumnName("test") val age: Int,
)

in IR successfully to:

@Sparkify
data class User(
    @get:JvmName("name") val name: String,
    @get:JvmName("test") @ColumnName("test") val age: Int,
): scala.Product {
  override fun canEqual(that: Any?): Boolean = that is User
  override fun productArity(): Int = 2
  override fun productElement(n: Int): Any? =
    if (n == 0) this.name
    else if (n == 1) this.age
    else throw IndexOutOfBoundsException()
}

This is scala version independent since it just queries the classpath for scala.Product and scala.Equals. So unless, Scala changes a lot, this should be future proof.