apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
447 stars 100 forks source link

feat: Implement Spark-compatible CAST between integer types #340

Closed ganeshkumar269 closed 2 weeks ago

ganeshkumar269 commented 2 weeks ago

Which issue does this PR close?

Closes #311

Rationale for this change

What changes are included in this PR?

How are these changes tested?

added corresponding scala test cases.

ganeshkumar269 commented 2 weeks ago

Hi @viirya @andygrove , firstly please let me know if this PR aligns with the expectations on how to fix the issue, if not kindly provide pointers on how I can move in the right direction.

Also reg the errors in the CI pipeline, looks like error message for 3.2 is a bit different compared to 3.3 and 3.4 for overflow cases. so in the rust code I will have to add a check for spark version and return the error message accordingly. How can I get spark version from the rust side?

viirya commented 2 weeks ago

Also reg the errors in the CI pipeline, looks like error message for 3.2 is a bit different compared to 3.3 and 3.4 for overflow cases. so in the rust code I will have to add a check for spark version and return the error message accordingly. How can I get spark version from the rust side?

For this kind of case, we tend to return same error message at native side, but handle this difference in Scala tests.

andygrove commented 2 weeks ago

As @viirya said, we can handle the difference in error message format in the scala test (we already have examples of this in the castTest method).

For versions prior to 3.4 perhaps you could just check for the word "overflow" in the error message.

- cast short to byte *** FAILED *** (439 milliseconds)
  "Execution error: [CAST_OVERFLOW] The value 18716S of the type "SMALLINT" cannot be cast to "TINYINT" due to an overflow. Use `try_cast` to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error." did not contain "asting 18716 to tinyint causes overflow" (CometCastSuite.scala:200)
ganeshkumar269 commented 2 weeks ago

thanks for the inputs @viirya @andygrove , I have added another check in the assert statement where we compare exception messages. Do you think this approach looks good? or do I need to separate the "invalid cast" and "overflow" assert statements.

If this change is fine, I will modify the code comments to accommodate "overflow" exception aswell.

andygrove commented 2 weeks ago

thanks for the inputs @viirya @andygrove , I have added another check in the assert statement where we compare exception messages. Do you think this approach looks good? or do I need to separate the "invalid cast" and "overflow" assert statements.

If this change is fine, I will modify the code comments to accommodate "overflow" exception aswell.

I think this general approach is OK to handle different types of expected errors. I left a specific comment on the code as well.

ganeshkumar269 commented 2 weeks ago

Hi @andygrove , i have added a check before we fetch sparkInvalidValue, defaulting it to EMPTY_STRING if ':' is not present. Also added additional comments on why we are checking for the presence of 'overflow' string.

ganeshkumar269 commented 2 weeks ago

thanks @andygrove 🙏🏾 , glad to be a contributor to comet.

andygrove commented 2 weeks ago

@ganeshkumar269 It looks like the error message check needs a little more work. Some tests are failing on Spark 3.3.

This error message does not contain : or overflow. In this specific case, looking for CAST_INVALID_INPUT would be a more robust check.

- cast StringType to LongType *** FAILED *** (416 milliseconds)
  "[CAST_INVALID_INPUT] The value '-9223372036854775809' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error." 
did not contain "overflow" (CometCastSuite.scala:881)
andygrove commented 2 weeks ago

@ganeshkumar269 My original code for comparing errors in 3.2/3.3 was not very robust. I am also looking at this now to see if I can help improve these checks.

ganeshkumar269 commented 2 weeks ago

@ganeshkumar269 My original code for comparing errors in 3.2/3.3 was not very robust. I am also looking at this now to see if I can help improve these checks.

is there a way I could help here?

andygrove commented 2 weeks ago

@ganeshkumar269 here is my suggestion:

            if (CometSparkSessionExtensions.isSpark34Plus) {
              // for Spark 3.4 we expect to reproduce the error message exactly
              assert(cometMessage == sparkMessage)
            } else if (CometSparkSessionExtensions.isSpark33Plus) {
              // for Spark 3.3 we just need to strip the prefix from the Comet message
              // before comparing
              val cometMessageModified = cometMessage
                .replace("[CAST_INVALID_INPUT] ", "")
                .replace("[CAST_OVERFLOW] ", "")
              assert(cometMessageModified == sparkMessage)
            } else if (CometSparkSessionExtensions.isSpark32) {
              // for Spark 3.2 we just make sure we are seeing a similar type of error
              if (sparkMessage.contains("causes overflow")) {
                assert(cometMessage.contains("due to an overflow"))
              } else {
                // assume that this is an invalid input message in the form:
                // `invalid input syntax for type numeric: -9223372036854775809`
                // we just check that the Comet message contains the same literal value
                val i = sparkMessage.indexOf(':') + 2
                assert(cometMessage.contains(sparkMessage.substring(i)))
              }
            }

I tested this from the command line for all Spark versions using:

mvn test -Pspark-3.2 -DwildcardSuites=org.apache.comet.CometCastSuite -Dspotless.check.skip=true