NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
822 stars 235 forks source link

[BUG] Support AQE with Broadcast Hash Join and DPP on Databricks 14.3 #11643

Open mythrocks opened 1 month ago

mythrocks commented 1 month ago

With AQE enabled on Databricks 14.3, the following tests in AQE are failing:

  1. aqe_test.py::test_aqe_join_with_dpp
  2. aqe_test.py::test_aqe_join_with_dpp_multi_columns

This is mentioned as part of #11527. The failure exception appears as follows:

java.lang.AssertionError: should not reach here
        at org.apache.spark.sql.rapids.execution.GpuSubqueryBroadcastMetaBase.tagPlanForGpu(GpuSubqueryBroadcastExec.scala:133)
        at com.nvidia.spark.rapids.SparkPlanMeta.tagSelfForGpu(RapidsMeta.scala:803)
        at com.nvidia.spark.rapids.RapidsMeta.tagForGpu(RapidsMeta.scala:318)
        at com.nvidia.spark.rapids.GpuOverrides$.wrapAndTagPlan(GpuOverrides.scala:4472)
        at com.nvidia.spark.rapids.shims.FileSourceScanExecMeta.com$nvidia$spark$rapids$shims$FileSourceScanExecMeta$$convertBroadcast(FileSourceScanExecMeta.scala:50)
        at com.nvidia.spark.rapids.shims.FileSourceScanExecMeta$$anonfun$$nestedInanonfun$convertDynamicPruningFilters$1$1.applyOrElse(FileSourceScanExecMeta.scala:96)
        at com.nvidia.spark.rapids.shims.FileSourceScanExecMeta$$anonfun$$nestedInanonfun$convertDynamicPruningFilters$1$1.applyOrElse(FileSourceScanExecMeta.scala:92)

The failure indicates that the Adaptive Query Plan has changed on Databricks since 13.3. Tracing the failure indicates that the child exec of AdaptiveSparkPlanExec is not BroadcastExchangeExec, but CollectLimitExec.

The logic in GpuSubqueryBroadcastExecMetaBase::tagPlanForGpu() needs to account for the new plan node.

mythrocks commented 1 month ago

Side note: I don't have the full details on the post-AQE plan here. If I'm unable to get to debugging this problem in short order, someone is going to have to extract this from the debugger.

mythrocks commented 1 month ago

Also, the error message here could stand to be more informative:

java.lang.AssertionError: should not reach here

"Should not reach here" can already be inferred from this being an AssertionError.

mythrocks commented 2 days ago

After some digging into the plans, it looks like the Databricks 14.3 plan for this query does not involve a BroadcastExchange node at all:

AQE=True on 14.3:  No BroadcastExchange at all.

Transformed Plan:
CollectLimit 21
+- GpuProject [gputoprettystring(site_id#6, None) AS toprettystring(site_id)#196, gputoprettystring(day#5, None) AS toprettystring(day)#197, gputoprettystring(test_id#7, None) AS toprettystring(test_id)#198, gputoprettystring(test_id#0, None) AS toprettystring(test_id)#199, gputoprettystring(site_id#1, None) AS toprettystring(site_id)#200], [loreId=25]
   +- GpuBroadcastHashJoin [test_id#7, site_id#6], [test_id#0, site_id#1], Inner, GpuBuildRight, true, [loreId=24]
      :- GpuUnion [loreId=23]
      :  :- GpuProject [site_id#6, day#5, test_id#7], [loreId=19]
      :  :  +- GpuFileGpuScan parquet [day#5,site_id#6,test_id#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/infoA], PartitionFilters: [isnotnull(day#5), (day#5 = 1990-01-01), site_id#6 IN (site_0,site_1), isnotnull(test_id#7), isno..., PushedFilters: [], ReadSchema: struct<>
      :  +- GpuProject [CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 WHEN (site_id#15 = LONG_SITE_NAME_1) THEN site_1 ELSE site_id#15 END AS site_id#184, day#14, test_spec#12 AS test_id#185], [loreId=22]
      :     +- GpuFilter gpuisnotnull(test_spec#12), [loreId=21]
      :        +- GpuFileGpuScan parquet [test_spec#12,day#14,site_id#15] Batched: true, DataFilters: [isnotnull(test_spec#12), dynamicpruning#207 206], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/infoB], PartitionFilters: [isnotnull(day#14), (day#14 = 1990-01-01), CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 ..., PushedFilters: [IsNotNull(test_spec)], ReadSchema: struct<test_spec:string>
      +- ShuffleQueryStage 0, Statistics(sizeInBytes=942.0 B, rowCount=34, ColumnStat: N/A, isRuntime=true)
         +- GpuColumnarExchange gpusinglepartitioning$(), EXECUTOR_BROADCAST, [plan_id=1739], [loreId=17]
            +- GpuCoalesceBatches targetsize(1073741824), [loreId=16]
               +- GpuFilter ((gpuisnotnull(test_id#0) AND gpuisnotnull(site_id#1)) AND site_id#1 INSET site_0, site_1), [loreId=15]
                  +- GpuFileGpuScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>

With AQE disabled, one does see BroadcastExchange:

AQE=False:  There is a SubqueryBroadcast.

Transformed Plan:
CollectLimit 21
+- GpuColumnarToRow false, [loreId=24]
   +- GpuProject [gputoprettystring(site_id#6, None) AS toprettystring(site_id)#222, gputoprettystring(day#5, None) AS toprettystring(day)#223, gputoprettystring(test_id#7, None) AS toprettystring(test_id)#224, gputoprettystring(test_id#0, None) AS toprettystring(test_id)#225, gputoprettystring(site_id#1, None) AS toprettystring(site_id)#226], [loreId=23]
      +- GpuBroadcastHashJoin [test_id#7, site_id#6], [test_id#0, site_id#1], Inner, GpuBuildRight, false, [loreId=22]
         :- GpuUnion [loreId=17]
         :  :- GpuProject [site_id#6, day#5, test_id#7], [loreId=12]
         :  :  +- GpuFileGpuScan parquet [day#5,site_id#6,test_id#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/infoA], PartitionFilters: [isnotnull(day#5), (day#5 = 1990-01-01), site_id#6 IN (site_0,site_1), isnotnull(test_id#7), isno..., PushedFilters: [], ReadSchema: struct<>
         :  :        :- SubqueryBroadcast dynamicpruning#233, [0], [test_id#0, site_id#1], false, [id=#1950]
         :  :        :  +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false], input[1, string, false]),false), [plan_id=1934]
         :  :        :     +- Filter ((isnotnull(test_id#0) AND isnotnull(site_id#1)) AND site_id#1 IN (site_0,site_1))
         :  :        :        +- FileScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>
         :  :        +- SubqueryBroadcast dynamicpruning#234, [1], [test_id#0, site_id#1], false, [id=#1951]
         :  :           +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false], input[1, string, false]),false), [plan_id=1934]
         :  :              +- Filter ((isnotnull(test_id#0) AND isnotnull(site_id#1)) AND site_id#1 IN (site_0,site_1))
         :  :                 +- FileScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>
         :  +- GpuProject [CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 WHEN (site_id#15 = LONG_SITE_NAME_1) THEN site_1 ELSE site_id#15 END AS site_id#210, day#14, test_spec#12 AS test_id#211], [loreId=16]
         :     +- GpuCoalesceBatches targetsize(1073741824), [loreId=15]
         :        +- GpuFilter gpuisnotnull(test_spec#12), [loreId=14]
         :           +- GpuFileGpuScan parquet [test_spec#12,day#14,site_id#15] Batched: true, DataFilters: [isnotnull(test_spec#12), dynamicpruningexpression(test_spec#12 IN dynamicpruning#233)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/infoB], PartitionFilters: [isnotnull(day#14), (day#14 = 1990-01-01), CASE WHEN (site_id#15 = LONG_SITE_NAME_0) THEN site_0 ..., PushedFilters: [IsNotNull(test_spec)], ReadSchema: struct<test_spec:string>
         :                 :- ReusedSubquery SubqueryBroadcast dynamicpruning#234, [1], [test_id#0, site_id#1], false, [id=#1951]
         :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#233, [0], [test_id#0, site_id#1], false, [id=#1950]
         +- GpuBroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, false], input[1, string, false]),false), [plan_id=2011], [loreId=21]
            +- GpuCoalesceBatches targetsize(1073741824), [loreId=20]
               +- GpuFilter ((gpuisnotnull(test_id#0) AND gpuisnotnull(site_id#1)) AND site_id#1 INSET site_0, site_1), [loreId=19]
                  +- GpuFileGpuScan parquet [test_id#0,site_id#1] Batched: true, DataFilters: [isnotnull(test_id#0), isnotnull(site_id#1), site_id#1 IN (site_0,site_1)], Format: Parquet, Location: InMemoryFileIndex[file:/tmp/myth/PARQUET_DATA/tests], PartitionFilters: [], PushedFilters: [IsNotNull(test_id), IsNotNull(site_id), In(site_id, [site_0,site_1])], ReadSchema: struct<test_id:string,site_id:string>

Ok, it looks like we're missing the logic from https://github.com/NVIDIA/spark-rapids/pull/6919 for 350db14.3.