apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

feat: Implement ANSI support for Round #989

Open raulcd opened 1 month ago

raulcd commented 1 month ago

Which issue does this PR close?

Closes #466 .

Rationale for this change

Improves compatibility with spark

What changes are included in this PR?

ANSI support for Drop by adding input checks when ANSI mode enabled.

How are these changes tested?

val df = Seq(Int.MaxValue, Int.MinValue).toDF("a")
df.write.parquet("/tmp/int.parquet")
spark.read.parquet("/tmp/int.parquet").createTempView("t")

scala> spark.conf.set("spark.comet.ansi.enabled", true)

scala> spark.conf.set("spark.sql.ansi.enabled", true)

scala> spark.conf.set("spark.comet.enabled", true)

scala> spark.sql("select a, round(a,-1) from t").show

Ensure the above raises:

org.apache.spark.SparkArithmeticException: [ARITHMETIC_OVERFLOW] Overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.
raulcd commented 1 month ago

I've just created a test that reproduces the error so far. I plan to keep working on this but I am having problems understanding the codebase and where the Round implementation is coming from. This is my first PR to the project so I am taking this as a learning exercise: I've been investigating both:

And from my understanding I probably have to create a new native/core/src/execution/datafusion/expressions/round.rs and modify: https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala#L1892-L1895

To something like:

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 51b32b7..890703f 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1889,7 +1889,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
           val optExpr = scalarExprToProto("pow", leftExpr, rightExpr)
           optExprWithInfo(optExpr, expr, left, right)

-        case r: Round =>
+        case r: Round(failOnError) =>

And from my understanding I might also have to add the Drop definition to core/src/execution/proto/expr.proto.

I might be quite lost around here so I would appreciate if the above sounds right or I am completely lost here.

andygrove commented 1 month ago

Thanks for picking this up @raulcd. I'll be happy to help you navigate this.

So Spark defines Round as:

case class Round(
    child: Expression,
    scale: Expression,
    override val ansiEnabled: Boolean = SQLConf.get.ansiEnabled)

in QueryPlanSerde, we have case r: Round, which is fine, but we currently ignore the value in r.ansiEnabled.

When serializing to protobuf, we use the generic ScalarFunction:

message ScalarFunc {
  string func = 1;
  repeated Expr args = 2;
  DataType return_type = 3;
}

Perhaps we should add an extra field here for fail_on_error?

raulcd commented 1 month ago

Perhaps we should add an extra field here for fail_on_error?

That's interesting, probably some of the other ScalarFunc might need to send the fail_on_error flag too in order to match ANSI mode so that seems like a good idea to me. I've tried to do it on this commit: https://github.com/apache/datafusion-comet/pull/989/commits/6d0b46c88452a90a380b6f78da6fa2f0045b17ef As per where to really check for the Overflow Error I guess we could do something like the stab I took on the following commit: https://github.com/apache/datafusion-comet/pull/989/commits/113ec6d8ffb38e3599f6bab03050f135f4c691cf

I'll have to check how to fix this error for Spark 3.3 too:

Error:  /__w/datafusion-comet/datafusion-comet/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:1930: value ansiEnabled is not a member of org.apache.spark.sql.catalyst.expressions.Round