Kotlin / kotlin-spark-api

This projects gives Kotlin bindings and several extensions for Apache Spark. We are looking to have this as a part of Apache Spark 3.x
Apache License 2.0
459 stars 35 forks source link

No way to define data class with Decimal(38, 0) in Spark schema #181

Open jkylling opened 2 years ago

jkylling commented 2 years ago

There seems to be no way to define data classes where the data class encoder produces a Spark schema with fields of type Decimal(38, 0). The natural approach would be to define a data class with a field of type BigInteger, but this is unsupported by the data class encoder.

This can be seen by the following code

data class A(val value: BigInteger)

fun main() = withSpark {
        val ds = dsOf(1, 2)
        val df = ds.`as`<A>()
        println(df.schema())
    }

which throws java.lang.IllegalArgumentException: java.math.BigInteger is unsupported.

Jolanrensen commented 2 years ago

Hi! java.math.BigDecimal is supported at the moment, Since you want the type Decimal, does that suffice for you? Or should we add BigInteger too?

jkylling commented 2 years ago

Hi! Yes, I've noticed that java.math.BigDecimal is supported, but it looks like in the resulting schema the type can only be Decimal(38, 18). It would be useful to be able to specify which decimal type it should be converted to. Perhaps this could be done with some kind of annotation on the type? For instance,

data class A(@DecimalType(38, 0) val value: BigDecimal)
Jolanrensen commented 2 years ago

I see... So you actually would need a way to give the DataType in case it doesn't exist in or is different from the knownDataTypes in the Encoding.kt file... That's actually an interesting potential addition to the API since we cannot possibly know about all possible types. I'll see what's possible :) In the meantime you could create a UDT for your class (User Defined Type). There's an example for that and it allows you to specify entirely how your class should be (de)serialized

Jolanrensen commented 2 years ago

Hmm so whatever DecimalType I specify, it turns out that the dtype turns into DecimalType(38,0) anyways whenever I encode a BigInteger. So maybe I can just add a BigInteger::class to DecimalType.SYSTEM_DEFAULT() to knownDataTypes since it won't make a difference anyways.

So... @jkylling, would an annotation approach still help? Cause I think for all unsupported classes UDTs would do the trick just fine (since you need both an encoder and datatype anyways). I could maybe see if I can make the @SQLUserDefinedType work for data class properties too instead of registering them for entire classes everywhere... but I'm not sure how many people would use that.

jkylling commented 2 years ago

Just support for BigInteger would be perfect, and probably cover most use cases. DecimalType.SYSTEM_DEFAULT() is Decimal(38, 18) right, so something else would be needed for Decimal(38, 0)?

Jolanrensen commented 2 years ago

Just support for BigInteger would be perfect, and probably cover most use cases. DecimalType.SYSTEM_DEFAULT() is Decimal(38, 18) right, so something else would be needed for Decimal(38, 0)?

Well yes, but it gets converted to DecimalType(38,0) by spark immediately even when I give it DecimalType.SYSTEM_DEFAULT(), so I don't think it matters much

jkylling commented 2 years ago

Just support for BigInteger would be perfect, and probably cover most use cases. DecimalType.SYSTEM_DEFAULT() is Decimal(38, 18) right, so something else would be needed for Decimal(38, 0)?

Well yes, but it gets converted to DecimalType(38,0) by spark immediately even when I give it DecimalType.SYSTEM_DEFAULT(), so I don't think it matters much

Interesting, this is what is used for BigDecimal already? I tried writing a data frame based on data classes with a BigDecimal field to a table with schema Decimal(38, 0), but got type errors because Decimal(38, 18) could not be converted to Decimal(38, 0). I figured the errors were because the data frame had a schema with Decimal(38, 18).

Jolanrensen commented 2 years ago

Interesting, this is what is used for BigDecimal already? I tried writing a data frame based on data classes with a BigDecimal field to a table with schema Decimal(38, 0), but got type errors because Decimal(38, 18) could not be converted to Decimal(38, 0). I figured the errors were because the data frame had a schema with Decimal(38, 18).

Have you got an example for me to try? :)

jkylling commented 2 years ago

Interesting, this is what is used for BigDecimal already? I tried writing a data frame based on data classes with a BigDecimal field to a table with schema Decimal(38, 0), but got type errors because Decimal(38, 18) could not be converted to Decimal(38, 0). I figured the errors were because the data frame had a schema with Decimal(38, 18).

Have you got an example for me to try? :)

It turns out the particular error I got was related to the code running in the Databricks runtime. Probably because the Databricks runtime has stricter conversion checks than the open source Spark runtime, which creates null values instead of throwing an exception. A minimal example of this behavior is below:

import org.jetbrains.kotlinx.spark.api.`as`
import org.jetbrains.kotlinx.spark.api.map
import org.jetbrains.kotlinx.spark.api.withSpark
import java.math.BigDecimal

data class A(val value: BigDecimal)

fun main() = withSpark {
    val table = "tbl"
    spark.sql("CREATE TABLE $table (value DECIMAL(38, 0)) USING parquet")
    spark.sql("INSERT INTO $table VALUES (2)")
    spark.sql("INSERT INTO $table VALUES (1${"0".repeat(37)})")
    val df = spark.sql("select * from $table order by 1 asc limit 1")
    df.`as`<A>()
        .map { A(it.value.add(BigDecimal("1" + "0".repeat(19)))) }
        .also { println(it.schema()) }
        .write().insertInto(table)
    df.`as`<A>()
        .map { A(it.value.add(BigDecimal("1" + "0".repeat(20)))) }
        .write().insertInto(table)
    spark.sql("select * from $table").show()
}

This outputs

StructType(StructField(value,DecimalType(38,18),true))
+--------------------+
|               value|
+--------------------+
|10000000000000000002|
|                   2|
|10000000000000000...|
|                null|
+--------------------+

In the Databricks runtime this example should instead throw an exception. Is there a way to write transforms like above which are able to write BigDecimal to Decimal(38, 0) using the full precision?

Jolanrensen commented 2 years ago

I'm afraid I don't know enough about this specific part of Spark to give a helpful answer. Maybe you should try StackOverflow for that too :)

jkylling commented 2 years ago

Let me rephrase the question: How would I use the Kotlin Spark API to get a Spark data frame with schema Decimal(38, 0) having a single row with value 2, and then add 10..0 (20 zeros) to this value using a transformation of data classes. As the example above shows, the row in the resulting data frame is null and not 10..02.

Jolanrensen commented 2 years ago

I just played around with it a bit. If you add 10..0 as BigDecimal it turns to null indeed. You need BigInteger encoding support to be able to get 10..02. (For which I made a pull request https://github.com/Kotlin/kotlin-spark-api/pull/182, it's merged and will appear in GH packages shortly for you to try). After that:

data class A(val value: BigInteger)

val df = dsOf(A(2.toBigInteger()))
    .showDS()
    .also { println(it.dtypes().toList()) }
+---+
| _1|
+---+
|  2|
+---+

[(_1,DecimalType(38,0))]
df
    .map { A(it.value.add(BigInteger("1" + "0".repeat(20)))) }
    .showDS(truncate = false)
    .also { println(it.dtypes().toList()) }
+---------------------+
|value                |
+---------------------+
|100000000000000000002|
+---------------------+

[(value,DecimalType(38,0))]
jkylling commented 2 years ago

I just played around with it a bit. If you add 10..0 as BigDecimal it turns to null indeed. You need BigInteger encoding support to be able to get 10..02. (For which I made a pull request #182, it's merged and will appear in GH packages shortly for you to try). After that:

data class A(val value: BigInteger)

val df = dsOf(A(2.toBigInteger()))
    .showDS()
    .also { println(it.dtypes().toList()) }
+---+
| _1|
+---+
|  2|
+---+

[(_1,DecimalType(38,0))]
df
    .map { A(it.value.add(BigInteger("1" + "0".repeat(20)))) }
    .showDS(truncate = false)
    .also { println(it.dtypes().toList()) }
+---------------------+
|value                |
+---------------------+
|100000000000000000002|
+---------------------+

[(value,DecimalType(38,0))]

Great! Thank you for fixing this! I'll give it a go soon.