Kotlin / kotlin-spark-api

This projects gives Kotlin bindings and several extensions for Apache Spark. We are looking to have this as a part of Apache Spark 3.x
Apache License 2.0
456 stars 35 forks source link

Add UDFRegister functions for Spark 2.4 #67

Closed FelixEngl closed 2 years ago

FelixEngl commented 3 years ago

Changes to old pull request

Correct branching.

Short description

Add a kotlin-styled way to create and call UDFs in a more secure manner.

Possible improvements

Future plans

asm0dey commented 3 years ago

Sorry for not answering for a long time, was busy with another project. It's very good PR, but needs several iprovements.

asm0dey commented 3 years ago

Please, forward-port this to version 3.0

asm0dey commented 3 years ago

I've following trouble:

I've tried to write more complex test:

                should("also work with datasets") {
                    listOf("a" to 1, "b" to 2).toDS().toDF().createOrReplaceTempView("test1")
                    udf.register<String, Int, Int>("stringIntDiff") { a, b ->
                        a[0].toInt() - b
                    }
                    spark.sql("select stringIntDiff(first, second) from test1").show()

                }

and it fails with

IntegerType (of class org.apache.spark.sql.KSimpleTypeWrapper)
scala.MatchError: IntegerType (of class org.apache.spark.sql.KSimpleTypeWrapper)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeFor(RowEncoder.scala:225)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeForInput(RowEncoder.scala:222)
    at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1728)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor$3(RowEncoder.scala:185)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:181)
    at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:61)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at org.jetbrains.kotlinx.spark.api.UDFRegisterTest$1$1$3$1$2.invokeSuspend(UDFRegisterTest.kt:104)
……

Looks like encoders won't work for our primitive type wrappers.

Jolanrensen commented 2 years ago

Closing this. We don't support spark 2 anymore and based on the UDFs for spark 3 we implemented the registration: https://github.com/Kotlin/kotlin-spark-api/pull/152 (released at v1.2.0) Thanks for the help and inspiration @FelixEngl !