NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
825 stars 236 forks source link

[BUG] NDS query 16 fails on EMR 6.8.0 with AQE enabled #6978

Closed mattahrens closed 1 year ago

mattahrens commented 2 years ago

Describe the bug When running NDS query 16 on EMR 6.8.0 with AQE enabled, the following exception is encountered:

py4j.protocol.Py4JJavaError: An error occurred while calling o192.collectToPython.
: java.lang.UnsupportedOperationException: WholeStageCodegen (2) does not implement doExecuteBroadcast
        at org.apache.spark.sql.errors.QueryExecutionErrors$.doExecuteBroadcastNotImplementedError(QueryExecutionErrors.scala:1924)
        at org.apache.spark.sql.execution.SparkPlan.doExecuteBroadcast(SparkPlan.scala:331)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$doExecuteBroadcast$1(AdaptiveSparkPlanExec.scala:500)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:506)
        at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecuteBroadcast(AdaptiveSparkPlanExec.scala:499)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:226)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:251)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:222)
        at org.apache.spark.sql.execution.exchange.BaseOutputAdapterExec.doExecuteBroadcast(Exchange.scala:136)
        at org.apache.spark.sql.execution.exchange.BaseOutputAdapterExec.doExecuteBroadcast$(Exchange.scala:135)
        at org.apache.spark.sql.execution.OutputAdapterExec.doExecuteBroadcast(OutputAdapterExec.scala:35)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:226)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:251)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:222)
        at org.apache.spark.sql.execution.SubqueryBroadcastExec.doExecuteCollect(SubqueryBroadcastExec.scala:77)
        at org.apache.spark.sql.execution.AsyncSubqueryExec$$anon$2.doCompute(basicPhysicalOperators.scala:931)
        at org.apache.spark.sql.execution.AsyncSubqueryExec$$anon$2.doCompute(basicPhysicalOperators.scala:928)
        at org.apache.spark.sql.execution.AsyncDriverOperation.$anonfun$compute$1(AsyncDriverOperation.scala:73)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:216)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:199)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
        at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:196)
        at org.apache.spark.sql.execution.AsyncDriverOperation.compute(AsyncDriverOperation.scala:67)
        at org.apache.spark.sql.execution.AsyncDriverOperation.$anonfun$computeFuture$1(AsyncDriverOperation.scala:53)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:267)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Note that when the default datasourceV2 is used for Parquet, this exception is not encountered. Additionally, NDS query 16 on CPU with AQE does not fail.

Environment details (please complete the following information) EMR 6.8.0 which is using Spark RAPIDS 22.06 jar DatasourceV1 is enabled for Parquet with this setting: spark.sql.sources.useV1SourceList=parquet

sameerz commented 2 years ago

Will retry with spark.rapids.sql.debug.logTransformations set to get more information

mattahrens commented 2 years ago

SQL plan for query 16 failure on EMR with AQE enabled (in 3 parts)

Screen Shot 2022-11-17 at 11 45 41 AM

Screen Shot 2022-11-17 at 11 51 51 AM

Screen Shot 2022-11-17 at 11 52 01 AM

jlowe commented 2 years ago

@NVnavkumar this looks similar to #7037, would be interesting to see if the fix for that would also apply here.

mimaomao commented 1 year ago

I back-ported the commits in #7037 in EMR and now the error trace is different as follows:

failed: java.lang.ClassCastException: [Lorg.apache.spark.sql.catalyst.InternalRow; cannot be cast to org.apache.spark.sql.execution.joins.HashedRelation
    at org.apache.spark.sql.execution.SubqueryBroadcastExec.doExecuteCollect(SubqueryBroadcastExec.scala:77)
    at org.apache.spark.sql.execution.AsyncSubqueryExec$$anon$2.doCompute(basicPhysicalOperators.scala:931)
    at org.apache.spark.sql.execution.AsyncSubqueryExec$$anon$2.doCompute(basicPhysicalOperators.scala:928)
    at org.apache.spark.sql.execution.AsyncDriverOperation.$anonfun$compute$1(AsyncDriverOperation.scala:73)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:216)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:199)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:196)
    at org.apache.spark.sql.execution.AsyncDriverOperation.compute(AsyncDriverOperation.scala:67)
    at org.apache.spark.sql.execution.AsyncDriverOperation.$anonfun$computeFuture$1(AsyncDriverOperation.scala:53)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:267)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:154)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
    at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:237)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:509)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:471)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3932)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3161)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3922)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:552)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3920)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3920)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3161)
    at com.databricks.spark.sql.perf.Query.$anonfun$benchmarkInternal$10(Query.scala:152)
    at com.databricks.spark.sql.perf.Benchmarkable.measureTimeMs(Benchmarkable.scala:117)
    at com.databricks.spark.sql.perf.Benchmarkable.measureTimeMs$(Benchmarkable.scala:115)
    at com.databricks.spark.sql.perf.Query.measureTimeMs(Query.scala:36)
    at com.databricks.spark.sql.perf.Query.benchmarkInternal(Query.scala:148)
    at com.databricks.spark.sql.perf.Query.$anonfun$doBenchmark$1(Query.scala:84)
    at com.databricks.spark.sql.perf.Query.withSparkListener(Query.scala:263)
    at com.databricks.spark.sql.perf.Query.doBenchmark(Query.scala:84)
    at com.databricks.spark.sql.perf.Benchmarkable$$anon$1.run(Benchmarkable.scala:81)

I think this issue is similar as in #7037, but not exactly the same.

jlowe commented 1 year ago

@NVnavkumar this looks like it could be #7133 where we need to convert the GPU broadcast to a hashed relation but we're computing rows instead.

NVnavkumar commented 1 year ago

I've done some more investigation. It looks like this is happening with some interaction with DPP. One caveat is that when spark.rapids.sql.debug.logTransformations is set to true, it still does not log the transformations on the WARNING level, so it is currently behaving as if that flag is turned off. I was able to get output from spark.rapids.sql.explain=ALL though:

*Exec <HashAggregateExec> will run on GPU
  *Expression <AggregateExpression> sum(UnscaledValue(cs_ext_ship_cost#763)) will run on GPU
    *Expression <Sum> sum(UnscaledValue(cs_ext_ship_cost#763)) will run on GPU
      *Expression <UnscaledValue> UnscaledValue(cs_ext_ship_cost#763) will run on GPU
  *Expression <AggregateExpression> sum(UnscaledValue(cs_net_profit#768)) will run on GPU
    *Expression <Sum> sum(UnscaledValue(cs_net_profit#768)) will run on GPU
      *Expression <UnscaledValue> UnscaledValue(cs_net_profit#768) will run on GPU
  *Expression <AggregateExpression> count(distinct cs_order_number#752) will run on GPU
    *Expression <Count> count(cs_order_number#752) will run on GPU
  *Expression <Alias> count(cs_order_number#752)#4398L AS order count#4359L will run on GPU
  *Expression <Alias> MakeDecimal(sum(UnscaledValue(cs_ext_ship_cost#763))#4399L,17,2) AS total shipping cost#4360 will run on GPU
    *Expression <MakeDecimal> MakeDecimal(sum(UnscaledValue(cs_ext_ship_cost#763))#4399L,17,2) will run on GPU
  *Expression <Alias> MakeDecimal(sum(UnscaledValue(cs_net_profit#768))#4400L,17,2) AS total net profit#4361 will run on GPU
    *Expression <MakeDecimal> MakeDecimal(sum(UnscaledValue(cs_net_profit#768))#4400L,17,2) will run on GPU
  *Exec <ShuffleExchangeExec> will run on GPU
    *Partitioning <SinglePartition$> will run on GPU
    *Exec <HashAggregateExec> will run on GPU
      *Expression <AggregateExpression> merge_sum(UnscaledValue(cs_ext_ship_cost#763)) will run on GPU
        *Expression <Sum> sum(UnscaledValue(cs_ext_ship_cost#763)) will run on GPU
          *Expression <UnscaledValue> UnscaledValue(cs_ext_ship_cost#763) will run on GPU
      *Expression <AggregateExpression> merge_sum(UnscaledValue(cs_net_profit#768)) will run on GPU
        *Expression <Sum> sum(UnscaledValue(cs_net_profit#768)) will run on GPU
          *Expression <UnscaledValue> UnscaledValue(cs_net_profit#768) will run on GPU
      *Expression <AggregateExpression> partial_count(distinct cs_order_number#752) will run on GPU
        *Expression <Count> count(cs_order_number#752) will run on GPU
      *Exec <HashAggregateExec> will run on GPU
        *Expression <AggregateExpression> merge_sum(UnscaledValue(cs_ext_ship_cost#763)) will run on GPU
          *Expression <Sum> sum(UnscaledValue(cs_ext_ship_cost#763)) will run on GPU
            *Expression <UnscaledValue> UnscaledValue(cs_ext_ship_cost#763) will run on GPU
        *Expression <AggregateExpression> merge_sum(UnscaledValue(cs_net_profit#768)) will run on GPU
          *Expression <Sum> sum(UnscaledValue(cs_net_profit#768)) will run on GPU
            *Expression <UnscaledValue> UnscaledValue(cs_net_profit#768) will run on GPU
        *Exec <ShuffleExchangeExec> will run on GPU
          *Partitioning <HashPartitioning> will run on GPU
          *Exec <HashAggregateExec> will run on GPU
            *Expression <AggregateExpression> partial_sum(UnscaledValue(cs_ext_ship_cost#763)) will run on GPU
              *Expression <Sum> sum(UnscaledValue(cs_ext_ship_cost#763)) will run on GPU
                *Expression <UnscaledValue> UnscaledValue(cs_ext_ship_cost#763) will run on GPU
            *Expression <AggregateExpression> partial_sum(UnscaledValue(cs_net_profit#768)) will run on GPU
              *Expression <Sum> sum(UnscaledValue(cs_net_profit#768)) will run on GPU
                *Expression <UnscaledValue> UnscaledValue(cs_net_profit#768) will run on GPU
            *Exec <ProjectExec> will run on GPU
              *Exec <SortMergeJoinExec> will run on GPU
                #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
                  #Expression <SortOrder> cs_ship_addr_sk#745 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
                  *Exec <ShuffleExchangeExec> will run on GPU
                    *Partitioning <HashPartitioning> will run on GPU
                    *Exec <ProjectExec> will run on GPU
                      *Exec <BroadcastHashJoinExec> will run on GPU
                        *Exec <ProjectExec> will run on GPU
                          *Exec <BroadcastHashJoinExec> will run on GPU
                            *Exec <SortMergeJoinExec> will run on GPU
                              *Exec <ProjectExec> will run on GPU
                                *Exec <SortMergeJoinExec> will run on GPU
                                  *Expression <Not> NOT (cs_warehouse_sk#749 = cs_warehouse_sk#4377) will run on GPU
                                    *Expression <EqualTo> (cs_warehouse_sk#749 = cs_warehouse_sk#4377) will run on GPU
                                  #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
                                    #Expression <SortOrder> cs_order_number#752 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
                                    *Exec <ShuffleExchangeExec> will run on GPU
                                      *Partitioning <HashPartitioning> will run on GPU
                                      *Exec <ProjectExec> will run on GPU
                                        !Exec <FilterExec> cannot run on GPU because not all expressions can be replaced
                                          @Expression <And> ((((isnotnull(cs_ship_date_sk#737) AND isnotnull(cs_call_center_sk#746)) AND isnotnull(cs_ship_addr_sk#745)) AND dynamicpruningexpression(cs_ship_date_sk#737 IN dynamicpruning#4405)) AND dynamicpruningexpression(cs_call_center_sk#746 IN dynamicpruning#4406)) could run on GPU
                                            @Expression <And> (((isnotnull(cs_ship_date_sk#737) AND isnotnull(cs_call_center_sk#746)) AND isnotnull(cs_ship_addr_sk#745)) AND dynamicpruningexpression(cs_ship_date_sk#737 IN dynamicpruning#4405)) could run on GPU
                                              @Expression <And> ((isnotnull(cs_ship_date_sk#737) AND isnotnull(cs_call_center_sk#746)) AND isnotnull(cs_ship_addr_sk#745)) could run on GPU
                                                @Expression <And> (isnotnull(cs_ship_date_sk#737) AND isnotnull(cs_call_center_sk#746)) could run on GPU
                                                  @Expression <IsNotNull> isnotnull(cs_ship_date_sk#737) could run on GPU
                                                    @Expression <AttributeReference> cs_ship_date_sk#737 could run on GPU
                                                  @Expression <IsNotNull> isnotnull(cs_call_center_sk#746) could run on GPU
                                                    @Expression <AttributeReference> cs_call_center_sk#746 could run on GPU
                                                @Expression <IsNotNull> isnotnull(cs_ship_addr_sk#745) could run on GPU
                                                  @Expression <AttributeReference> cs_ship_addr_sk#745 could run on GPU
                                              ! <DynamicPruningExpression> dynamicpruningexpression(cs_ship_date_sk#737 IN dynamicpruning#4405) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
                                                ! <InSubqueryExec> cs_ship_date_sk#737 IN dynamicpruning#4405 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.InSubqueryExec
                                                  @Expression <AttributeReference> cs_ship_date_sk#737 could run on GPU
                                            ! <DynamicPruningExpression> dynamicpruningexpression(cs_call_center_sk#746 IN dynamicpruning#4406) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
                                              ! <InSubqueryExec> cs_call_center_sk#746 IN dynamicpruning#4406 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.InSubqueryExec
                                                @Expression <AttributeReference> cs_call_center_sk#746 could run on GPU
                                          *Exec <FileSourceScanExec> will run on GPU
                                  #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
                                    #Expression <SortOrder> cs_order_number#4380 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
                                    *Exec <ShuffleExchangeExec> will run on GPU
                                      *Partitioning <HashPartitioning> will run on GPU
                                      *Exec <ProjectExec> will run on GPU
                                        *Exec <FileSourceScanExec> will run on GPU
                              #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
                                #Expression <SortOrder> cr_order_number#581 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
                                *Exec <ShuffleExchangeExec> will run on GPU
                                  *Partitioning <HashPartitioning> will run on GPU
                                  *Exec <ProjectExec> will run on GPU
                                    *Exec <FileSourceScanExec> will run on GPU
                            *Exec <BroadcastExchangeExec> will run on GPU
                              *Exec <ProjectExec> will run on GPU
                                *Exec <FilterExec> will run on GPU
                                  *Expression <And> (((isnotnull(d_date#46) AND (d_date#46 >= 2002-04-01)) AND (d_date#46 <= 2002-05-31)) AND isnotnull(d_date_sk#44)) will run on GPU
                                    *Expression <And> ((isnotnull(d_date#46) AND (d_date#46 >= 2002-04-01)) AND (d_date#46 <= 2002-05-31)) will run on GPU
                                      *Expression <And> (isnotnull(d_date#46) AND (d_date#46 >= 2002-04-01)) will run on GPU
                                        *Expression <IsNotNull> isnotnull(d_date#46) will run on GPU
                                        *Expression <GreaterThanOrEqual> (d_date#46 >= 2002-04-01) will run on GPU
                                      *Expression <LessThanOrEqual> (d_date#46 <= 2002-05-31) will run on GPU
                                    *Expression <IsNotNull> isnotnull(d_date_sk#44) will run on GPU
                                  *Exec <FileSourceScanExec> will run on GPU
                        *Exec <BroadcastExchangeExec> will run on GPU
                          *Exec <ProjectExec> will run on GPU
                            *Exec <FilterExec> will run on GPU
                              *Expression <And> (cc_county#299 IN (Walker County,Oglethorpe County,Huron County,Wadena County,Ziebach County) AND isnotnull(cc_call_center_sk#274)) will run on GPU
                                *Expression <In> cc_county#299 IN (Walker County,Oglethorpe County,Huron County,Wadena County,Ziebach County) will run on GPU
                                *Expression <IsNotNull> isnotnull(cc_call_center_sk#274) will run on GPU
                              *Exec <FileSourceScanExec> will run on GPU
                #Exec <SortExec> could run on GPU but is going to be removed because replacing sortMergeJoin with shuffleHashJoin
                  #Expression <SortOrder> ca_address_sk#0 ASC NULLS FIRST could run on GPU but is going to be removed because parent plan is removed
                  *Exec <ShuffleExchangeExec> will run on GPU
                    *Partitioning <HashPartitioning> will run on GPU
                    *Exec <ProjectExec> will run on GPU
                      *Exec <FilterExec> will run on GPU
                        *Expression <And> ((isnotnull(ca_state#8) AND (ca_state#8 = TN)) AND isnotnull(ca_address_sk#0)) will run on GPU
                          *Expression <And> (isnotnull(ca_state#8) AND (ca_state#8 = TN)) will run on GPU
                            *Expression <IsNotNull> isnotnull(ca_state#8) will run on GPU
                            *Expression <EqualTo> (ca_state#8 = TN) will run on GPU
                          *Expression <IsNotNull> isnotnull(ca_address_sk#0) will run on GPU
                        *Exec <FileSourceScanExec> will run on GPU

It looks like the root of this issue is happening here:

...
                                        !Exec <FilterExec> cannot run on GPU because not all expressions can be replaced
...
                                              ! <DynamicPruningExpression> dynamicpruningexpression(cs_ship_date_sk#737 IN dynamicpruning#4405) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
                                                ! <InSubqueryExec> cs_ship_date_sk#737 IN dynamicpruning#4405 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.InSubqueryExec
                                                  @Expression <AttributeReference> cs_ship_date_sk#737 could run on GPU
                                            ! <DynamicPruningExpression> dynamicpruningexpression(cs_call_center_sk#746 IN dynamicpruning#4406) cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
                                              ! <InSubqueryExec> cs_call_center_sk#746 IN dynamicpruning#4406 cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.InSubqueryExec
                                                @Expression <AttributeReference> cs_call_center_sk#746 could run on GPU
...

That would explain the backtrace showing some issue in SubqueryBroadcastExec. This looks like DPP, but I have also turned off DPP via both spark.sql.optimizer.dynamicPartitionPruning.enabled=false and the original deprecated flag spark.sql.dynamicPartitionPruning.enabled=false (which comes from the AWS EMR documentation here https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-performance.html) as well as set spark.sql.bloomFilterJoin.enabled=false since we don't support that on the GPU.

NVnavkumar commented 1 year ago

Actually, went into the Spark History Server, and pulled out the problematic part of the plan:

                        :           :     :  :     :           +- GpuRowToColumnar targetsize(1073741824)
                        :           :     :  :     :              +- *(1) Filter ((((isnotnull(cs_ship_date_sk#737) AND isnotnull(cs_call_center_sk#746)) AND isnotnull(cs_ship_addr_sk#745)) AND dynamicpruningexpression(cs_ship_date_sk#737 IN dynamicpruning#4431)) AND dynamicpruningexpression(cs_call_center_sk#746 IN dynamicpruning#4432))
                        :           :     :  :     :                 :  :- SubqueryBroadcast dynamicpruning#4431, 0, [d_date_sk#44], [id=#35079]
                        :           :     :  :     :                 :  :  +- OutputAdapter [d_date_sk#44]
                        :           :     :  :     :                 :  :     +- AdaptiveSparkPlan isFinalPlan=true
                        :           :     :  :     :                 :  :        +- *(2) ColumnarToRow
                        :           :     :  :     :                 :  :           +- BroadcastQueryStage 7
                        :           :     :  :     :                 :  :              +- ReusedExchange [d_date_sk#44], GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#35320]
                        :           :     :  :     :                 :  +- SubqueryBroadcast dynamicpruning#4432, 0, [cc_call_center_sk#274], [id=#35092]
                        :           :     :  :     :                 :     +- OutputAdapter [cc_call_center_sk#274]
                        :           :     :  :     :                 :        +- AdaptiveSparkPlan isFinalPlan=false
                        :           :     :  :     :                 :           +- BroadcastQueryStage 9
                        :           :     :  :     :                 :              +- ReusedExchange [cc_call_center_sk#274], GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#35351]

The root cause looks to be a CPU SubqueryBroadcast running on the CPU that is trying to reuse a GpuBroadcastExchange. This also uses HashedRelationBroadcastMode (which is something we have trying to test for), so as @jlowe mentioned earlier, this is also a version of #7133 in that the code needs to be updated to handle HashedRelation (as shown in the backported fix backtrace shared earlier).