apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
830 stars 164 forks source link

Implement Spark-compatible cast between decimals with different precision and scale #375

Open andygrove opened 7 months ago

andygrove commented 7 months ago

What is the problem the feature request solves?

Comet is not consistent with Spark when casting between decimals. Here is a test to demonstrate this.

  test("cast between decimals with different precision and scale") {
    val rowData = Seq(
      Row(BigDecimal("12345.6789")),
      Row(BigDecimal("9876.5432")),
      Row(BigDecimal("123.4567"))
    )
    val df = spark.createDataFrame(
      spark.sparkContext.parallelize(rowData),
      StructType(Seq(StructField("a", DataTypes.createDecimalType(10,4))))
    )
    castTest(df, DataTypes.createDecimalType(6,2))
  }

Spark Result

+----------+-------+
|         a|      a|
+----------+-------+
|  123.4567| 123.46|
| 9876.5432|9876.54|
|12345.6789|   null|
+----------+-------+

Comet Result

java.lang.ArithmeticException: Cannot convert 12345.68 (bytes: [B@4f834a43, integer: 1234568) to decimal with precision: 6 and scale: 2
    at org.apache.comet.vector.CometVector.getDecimal(CometVector.java:86)

Describe the potential solution

No response

Additional context

No response

viirya commented 7 months ago

This looks simple to fix. We currently throw an exception if cannot convert the byes to decimal, but looks like Spark returns null.

caicancai commented 7 months ago

@andygrove Do you mind letting me try it? In my spare time, I have been fixing the compatibility issues of calcite in spark sql type conversion.

viirya commented 7 months ago

@caicancai Thanks. Please go ahead to create a PR for the open tickets which no one claims working on it.

caicancai commented 6 months ago

I am working on it.

himadripal commented 2 weeks ago

I was trying to take a look at this one - I added this test in the CometCastSuite -

test("cast between decimals with different precision and scale") {
    val df = generateDecimalsPrecision38Scale18()
    val df1 = df.withColumn("b", col("a").cast(DataTypes.createDecimalType(10, 2)))
    df1.show(false)
    castTest(generateDecimalsPrecision38Scale18(), DataTypes.createDecimalType(10, 2))
  }

It gives me result like this

+----------------------------------------+----------+
|a                                       |b         |
+----------------------------------------+----------+
|-99999999999999999999.999999999999000000|null      |
|-9223372036854775808.234567000000000000 |null      |
|-9223372036854775807.123123000000000000 |null      |
|-2147483648.123123123000000000          |null      |
|-2147483647.123123123000000000          |null      |
|-123456.789000000000000000              |-123456.79|
|0E-18                                   |0.00      |
|123456.789000000000000000               |123456.79 |
|2147483647.123123123000000000           |null      |
|2147483648.123123123000000000           |null      |
|9223372036854775807.123123000000000000  |null      |
|9223372036854775808.234567000000000000  |null      |
|99999999999999999999.999999999999000000 |null      |
|null                                    |null      |
+----------------------------------------+----------+

But castTest fails with the following assertion error -

Expected only Comet native operators, but found Sort.
plan: Sort [a#30 ASC NULLS FIRST], true, 0
+- Project [a#30, cast(a#30 as decimal(10,2)) AS a#32]
   +- CometCoalesce Coalesce 1, [a#30], 1
      +- CometScan parquet [a#30] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/jx/23vwhfzn2ts493_2twyz1dpc0000gn/T/spark-5f..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(38,18)>

Also this code inside the test(....) produces following plan

spark.sql(s"select a, cast(a as decimal(10,2)) from t2 order by a").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CometSort [a#4, a#9], [a#4 ASC NULLS FIRST]
   +- CometColumnarExchange rangepartitioning(a#4 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=13]
      +- LocalTableScan [a#4, a#9]

Update : Figured out that, removing .coalesce(1) makes the plan CometSort but assertion fails in Project now

val data = roundtripParquet(input, dir)\\.coalesce(1)

@viirya @andygrove can you provide guidance here on how to proceed ?

andygrove commented 2 weeks ago

Thanks for looking into this @himadripal. The issue is that the projection is falling back to Spark because org.apache.comet.expressions.CometCast#isSupported is either returning Unsupported or Incompatible for a cast between decimal types, so the first step is to update this method to say that this cast is now supported.

andygrove commented 2 weeks ago

You may want to set spark.comet.explainFallback.enabled=true as well so that you can see the reason why queries are falling back to Spark.

himadripal commented 2 weeks ago

thank you @andygrove for the guidance and tip. I'll explore spark.comet.explainFallback.enabled=true as well.