Kotlin / kotlin-spark-api

This projects gives Kotlin bindings and several extensions for Apache Spark. We are looking to have this as a part of Apache Spark 3.x
Apache License 2.0
459 stars 35 forks source link

Define udf using by-delegate? #143

Closed Jolanrensen closed 2 years ago

Jolanrensen commented 2 years ago

UDFs are currently defined for example like:

val stringArrayMerger: UDFWrapper1 = udf.register<WrappedArray<String>, String>("stringArrayMerger") {
    it.asIterable().joinToString(" ")
}

They can then be invoked from SQL or directly in code like: stringArrayMerger(testData.col("value")).

Obviously the thing I notice is that stringArrayMerger is typed twice. This can of course be rewritten using delegates :)

This is possible

val stringArrayMerger: UDFWrapper1 by udf.register<WrappedArray<String>, String> {
    it.toIterable().asKotlinIterable().joinToString(" ")
}

when adding something like:

class UDFWrapperBuilderDelegate<UDF>(val getValue: (name: String) -> UDF) {
    operator fun getValue(
        thisRef: Any?,
        property: KProperty<*>
    ): UDF = getValue(property.name)
}

and for each UDF register function:

inline fun <reified T0, reified R> UDFRegistration.register(noinline func: (T0) -> R): UDFWrapperBuilderDelegate<UDFWrapper1> =
    UDFWrapperBuilderDelegate { name -> register(name, func) }

inline fun <reified T0, reified T1, reified R> UDFRegistration.register(noinline func: (T0, T1) -> R): UDFWrapperBuilderDelegate<UDFWrapper2> =
    UDFWrapperBuilderDelegate { name -> register(name, func) }

...

Just a thought :)

Jolanrensen commented 2 years ago

The compiler is even smart enough to only have to type the bare minimum:

// UDFWrapper1
val stringArrayMerger by udf.register { it: WrappedArray<String> ->
    it.toIterable().asKotlinIterable().joinToString(" ")
}

// UDFWrapper2
val stringArrayMerger2 by udf.register { a: WrappedArray<String>, b: Int ->
    a.toIterable().asKotlinIterable().joinToString(" $b ")
}
Jolanrensen commented 2 years ago

:Okay, maybe it's better to not use delegates after all. The function is only registered after the delegate value is accessed, which does not happen when it's called from SQL for example... Better look for another notation :)

Jolanrensen commented 2 years ago

https://github.com/Kotlin/kotlin-spark-api/pull/152 handles it :)