NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
797 stars 232 forks source link

[FEA] [Databricks 12.2] Support GpuMergeIntoCommand notMatchedBySourceClauses on GPU #8415

Open andygrove opened 1 year ago

andygrove commented 1 year ago

Is your feature request related to a problem? Please describe. PR https://github.com/NVIDIA/spark-rapids/pull/8282 adds basic support for Databricks 12.2 but did not add support for notMatchedBySourceClauses in GpuMergeIntoCommand and instead falls back to CPU.

Describe the solution you'd like We should support notMatchedBySourceClauses on GPU.

Describe alternatives you've considered

Additional context See the discussion at https://github.com/NVIDIA/spark-rapids/pull/8282#discussion_r1204462293 for more details.

andygrove commented 1 year ago

Please ignore. This was unrelated to this issue and is now resolved.

~One challenge in implementing this is that we replace JoinedRowProcessor with a RapidsProcessDeltaMergeJoin logical operator that involves a non-deterministic expression (monotonically_increasing_id). Spark has an analyzer rule that only allows non-deterministic expressions in a hard-coded set of operators, resulting in the following error:~

23/06/14 16:55:53 ERROR GpuMergeIntoCommand: Fatal error in MERGE with materialized source in attempt 1.
org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found:
a,b,c,_target_row_id_,_row_dropped_,_incr_row_count_,_change_type,(_source_row_present_ IS NULL),(_target_row_present_ IS NULL),true,a,b,c,_target_row_id_,true,(UDF() AND UDF()),CAST(NULL AS STRING),a,b,c,_target_row_id_,false,true,'delete',a,b,c,_target_row_id_,false,UDF(),CAST(NULL AS STRING),a,b,c,_target_row_id_,true,true,CAST(NULL AS STRING)
in operator RapidsProcessDeltaMergeJoin [a#304727, b#304728, c#304729, _target_row_id_#304730L, _row_dropped_#304731, _incr_row_count_#304732, _change_type#304733], isnull(_source_row_present_#304699), isnull(_target_row_present_#304702), [true], [[[a#304425, b#304426, c#304427, _target_row_id_#304707L, true, (UDF() AND UDF()), null], [a#304425, b#304426, c#304427, _target_row_id_#304707L, false, true, delete]]], [a#304425, b#304426, c#304427, _target_row_id_#304707L, false, UDF(), null], [a#304425, b#304426, c#304427, _target_row_id_#304707L, true, true, null].; line 1 pos 0;

Here is the Spark code from CheckAnalysis:

          case o if o.expressions.exists(!_.deterministic) &&
            !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
            !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] &&
            // Lateral join is checked in checkSubqueryExpression.
            !o.isInstanceOf[LateralJoin] =>
            // The rule above is used to check Aggregate operator.
            o.failAnalysis(
              errorClass = "_LEGACY_ERROR_TEMP_2439",
              messageParameters = Map(
                "sqlExprs" -> o.expressions.map(_.sql).mkString(","),
                "operator" -> operator.simpleString(SQLConf.get.maxToStringFields)))

For now, we avoid replacing the operator if notMatchedBySourceClauses is non-empty.