Kotlin / kotlin-spark-api

This projects gives Kotlin bindings and several extensions for Apache Spark. We are looking to have this as a part of Apache Spark 3.x
Apache License 2.0
459 stars 35 forks source link

Tuple encoding (but also help needed) #79

Closed Jolanrensen closed 3 years ago

Jolanrensen commented 3 years ago

This adds the ability to encode all tuples of length 2 until 5 (as provided by the spark encoder functions) using the encoder<T>() method. This means you can now create and use

val dataset = dsOf(
    Tuple2("a", Tuple3("a", 1, SomeDataClass(1.0, 1.0))),
    Tuple2("b", Tuple3("b", 2, SomeDataClass(1.0, 2.0))),
)

and it will work correctly (Test is included).

What does not yet work and what I will need help for is providing the schema for tuples and other Product types. That would make

val dataset = dsOf(
    SomeDataClass(Tuple2("a", 1)),
    SomeDataClass(Tuple2("b", 4)),
)

possible as well. I did try to create a DataType for it (at the TODO in ApiV1.kt) and wrote a test for it, but due to my inexperience with the wrappers and Scala, I'm not able to get it to work currently. Any help would be welcome!

Of course, merging the first and skipping the second for now is also an option.

Jolanrensen commented 3 years ago

So what I'm trying to make possible is:

val dataset: Dataset<List<Tuple2<String, Int>>> = dsOf(
    listOf(
        Tuple2("a", 1),
        Tuple2("b", 4),
    )
)

This yields some strange errors when creating a DataType for the Tuples.

I looked at the example of creating a DataType from a data class, but apparently creating

val dataset: Dataset<List<SomeDataClass>> = dsOf(
    listOf(
        SomeDataClass(),
        SomeDataClass(),
    )
)

is not possible either...

asm0dey commented 3 years ago

We already have it implemented here: https://github.com/JetBrains/kotlin-spark-api/blob/master/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt

Jolanrensen commented 3 years ago

We already have it implemented here: https://github.com/JetBrains/kotlin-spark-api/blob/master/kotlin-spark-api/common/src/main/kotlin/org/jetbrains/kotlinx/spark/api/VarArities.kt

Well, but if your project uses Scala Tuples, which is very common, then the Kotlin Spark API cannot encode them. You'd first need to convert them to those VarArities and back. Not only that, but the Spark API itself often returns Tuples, like the joinWith() method.