NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
768 stars 226 forks source link

[BUG] test_dpp_reuse_broadcast_exchange failed #10147

Open jlowe opened 6 months ago

jlowe commented 6 months ago

From a recent nightly test run:

[2024-01-03T17:41:18.525Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-parquet][DATAGEN_SEED=1704297021, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.
[2024-01-03T17:41:18.525Z] : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan
[2024-01-03T17:41:18.525Z] AdaptiveSparkPlan isFinalPlan=true
[2024-01-03T17:41:18.525Z] +- == Final Plan ==
[2024-01-03T17:41:18.525Z]    LocalTableScan <empty>, [key#37135, max(value)#37151L]
[2024-01-03T17:41:18.525Z] +- == Initial Plan ==
[2024-01-03T17:41:18.525Z]    Sort [key#37135 ASC NULLS FIRST, max(value)#37151L ASC NULLS FIRST], true, 0
[2024-01-03T17:41:18.525Z]    +- Exchange rangepartitioning(key#37135 ASC NULLS FIRST, max(value)#37151L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=109491]
[2024-01-03T17:41:18.525Z]       +- HashAggregate(keys=[key#37135], functions=[max(value#37136L)], output=[key#37135, max(value)#37151L])
[2024-01-03T17:41:18.525Z]          +- Exchange hashpartitioning(key#37135, 4), ENSURE_REQUIREMENTS, [plan_id=109488]
[2024-01-03T17:41:18.525Z]             +- HashAggregate(keys=[key#37135], functions=[partial_max(value#37136L)], output=[key#37135, max#37199L])
[2024-01-03T17:41:18.525Z]                +- Union
[2024-01-03T17:41:18.525Z]                   :- Project [key#37054 AS key#37135, value#37140L AS value#37136L]
[2024-01-03T17:41:18.525Z]                   :  +- BroadcastHashJoin [key#37054], [key#37056], Inner, BuildRight, false
[2024-01-03T17:41:18.526Z]                   :     :- HashAggregate(keys=[key#37054], functions=[sum(value#37053)], output=[key#37054, value#37140L])
[2024-01-03T17:41:18.526Z]                   :     :  +- Exchange hashpartitioning(key#37054, 4), ENSURE_REQUIREMENTS, [plan_id=109473]
[2024-01-03T17:41:18.526Z]                   :     :     +- HashAggregate(keys=[key#37054], functions=[partial_sum(value#37053)], output=[key#37054, sum#37201L])
[2024-01-03T17:41:18.526Z]                   :     :        +- Project [value#37053, key#37054]
[2024-01-03T17:41:18.526Z]                   :     :           +- Filter (isnotnull(value#37053) AND (value#37053 > 0))
[2024-01-03T17:41:18.526Z]                   :     :              +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_0[value#37053,key#37054,skey#37055] Batched: true, DataFilters: [isnotnull(value#37053), (value#37053 > 0)], Format: Parquet, Location: InMemoryFileIndex(50 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-git..., PartitionFilters: [isnotnull(key#37054), dynamicpruningexpression(key#37054 IN dynamicpruning#37196)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
[2024-01-03T17:41:18.526Z]                   :     :                    +- SubqueryAdaptiveBroadcast dynamicpruning#37196, 0, true, Project [key#37056], [key#37056]
[2024-01-03T17:41:18.526Z]                   :     :                       +- AdaptiveSparkPlan isFinalPlan=false
[2024-01-03T17:41:18.526Z]                   :     :                          +- Project [key#37056]
[2024-01-03T17:41:18.526Z]                   :     :                             +- Filter ((((isnotnull(ex_key#37058) AND isnotnull(filter#37060)) AND (ex_key#37058 = 3)) AND (filter#37060 = 1458)) AND isnotnull(key#37056))
[2024-01-03T17:41:18.526Z]                   :     :                                +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_1[key#37056,ex_key#37058,filter#37060] Batched: true, DataFilters: [isnotnull(ex_key#37058), isnotnull(filter#37060), (ex_key#37058 = 3), (filter#37060 = 1458), isn..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.526Z]                   :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=109476]
[2024-01-03T17:41:18.526Z]                   :        +- Project [key#37056]
[2024-01-03T17:41:18.526Z]                   :           +- Filter ((((isnotnull(ex_key#37058) AND isnotnull(filter#37060)) AND (ex_key#37058 = 3)) AND (filter#37060 = 1458)) AND isnotnull(key#37056))
[2024-01-03T17:41:18.526Z]                   :              +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_1[key#37056,ex_key#37058,filter#37060] Batched: true, DataFilters: [isnotnull(ex_key#37058), isnotnull(filter#37060), (ex_key#37058 = 3), (filter#37060 = 1458), isn..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.526Z]                   +- Project [key#37184, value#37187L]
[2024-01-03T17:41:18.526Z]                      +- BroadcastHashJoin [key#37184], [key#37188], Inner, BuildRight, false
[2024-01-03T17:41:18.526Z]                         :- HashAggregate(keys=[key#37184], functions=[sum(value#37183)], output=[key#37184, value#37187L])
[2024-01-03T17:41:18.526Z]                         :  +- Exchange hashpartitioning(key#37184, 4), ENSURE_REQUIREMENTS, [plan_id=109479]
[2024-01-03T17:41:18.526Z]                         :     +- HashAggregate(keys=[key#37184], functions=[partial_sum(value#37183)], output=[key#37184, sum#37203L])
[2024-01-03T17:41:18.526Z]                         :        +- Project [value#37183, key#37184]
[2024-01-03T17:41:18.526Z]                         :           +- Filter (isnotnull(value#37183) AND (value#37183 > 0))
[2024-01-03T17:41:18.526Z]                         :              +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_0[value#37183,key#37184,skey#37185] Batched: true, DataFilters: [isnotnull(value#37183), (value#37183 > 0)], Format: Parquet, Location: InMemoryFileIndex(50 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-git..., PartitionFilters: [isnotnull(key#37184), dynamicpruningexpression(key#37184 IN dynamicpruning#37197)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
[2024-01-03T17:41:18.526Z]                         :                    +- SubqueryAdaptiveBroadcast dynamicpruning#37197, 0, true, Project [key#37188], [key#37188]
[2024-01-03T17:41:18.526Z]                         :                       +- AdaptiveSparkPlan isFinalPlan=false
[2024-01-03T17:41:18.526Z]                         :                          +- Project [key#37188]
[2024-01-03T17:41:18.526Z]                         :                             +- Filter ((((isnotnull(ex_key#37190) AND isnotnull(filter#37192)) AND (ex_key#37190 = 3)) AND (filter#37192 = 1458)) AND isnotnull(key#37188))
[2024-01-03T17:41:18.526Z]                         :                                +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_1[key#37188,ex_key#37190,filter#37192] Batched: true, DataFilters: [isnotnull(ex_key#37190), isnotnull(filter#37192), (ex_key#37190 = 3), (filter#37192 = 1458), isn..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.526Z]                         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=109482]
[2024-01-03T17:41:18.526Z]                            +- Project [key#37188]
[2024-01-03T17:41:18.526Z]                               +- Filter ((((isnotnull(ex_key#37190) AND isnotnull(filter#37192)) AND (ex_key#37190 = 3)) AND (filter#37192 = 1458)) AND isnotnull(key#37188))
[2024-01-03T17:41:18.526Z]                                  +- FileScan parquet spark_catalog.default.tmp_table_gw1_470723338_1[key#37188,ex_key#37190,filter#37192] Batched: true, DataFilters: [isnotnull(ex_key#37190), isnotnull(filter#37192), (ex_key#37190 = 3), (filter#37192 = 1458), isn..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.526Z] 
[2024-01-03T17:41:18.526Z]  at scala.Predef$.assert(Predef.scala:223)
[2024-01-03T17:41:18.526Z]  at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:170)
[2024-01-03T17:41:18.526Z]  at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:175)
[2024-01-03T17:41:18.526Z]  at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback$.assertContains(ExecutionPlanCaptureCallback.scala:76)
[2024-01-03T17:41:18.526Z]  at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains(ExecutionPlanCaptureCallback.scala)
[2024-01-03T17:41:18.526Z]  at sun.reflect.GeneratedMethodAccessor140.invoke(Unknown Source)
[2024-01-03T17:41:18.526Z]  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[2024-01-03T17:41:18.526Z]  at java.lang.reflect.Method.invoke(Method.java:498)
[2024-01-03T17:41:18.526Z]  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
[2024-01-03T17:41:18.526Z]  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
[2024-01-03T17:41:18.526Z]  at py4j.Gateway.invoke(Gateway.java:282)
[2024-01-03T17:41:18.526Z]  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
[2024-01-03T17:41:18.526Z]  at py4j.commands.CallCommand.execute(CallCommand.java:79)
[2024-01-03T17:41:18.526Z]  at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
[2024-01-03T17:41:18.526Z]  at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
[2024-01-03T17:41:18.527Z]  at java.lang.Thread.run(Thread.java:750)
[2024-01-03T17:41:18.527Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-orc][DATAGEN_SEED=1704297021, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.
[2024-01-03T17:41:18.527Z] : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan
[2024-01-03T17:41:18.527Z] AdaptiveSparkPlan isFinalPlan=true
[2024-01-03T17:41:18.527Z] +- == Final Plan ==
[2024-01-03T17:41:18.527Z]    LocalTableScan <empty>, [key#37628, max(value)#37644L]
[2024-01-03T17:41:18.527Z] +- == Initial Plan ==
[2024-01-03T17:41:18.527Z]    Sort [key#37628 ASC NULLS FIRST, max(value)#37644L ASC NULLS FIRST], true, 0
[2024-01-03T17:41:18.527Z]    +- Exchange rangepartitioning(key#37628 ASC NULLS FIRST, max(value)#37644L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=111165]
[2024-01-03T17:41:18.527Z]       +- HashAggregate(keys=[key#37628], functions=[max(value#37629L)], output=[key#37628, max(value)#37644L])
[2024-01-03T17:41:18.527Z]          +- Exchange hashpartitioning(key#37628, 4), ENSURE_REQUIREMENTS, [plan_id=111162]
[2024-01-03T17:41:18.527Z]             +- HashAggregate(keys=[key#37628], functions=[partial_max(value#37629L)], output=[key#37628, max#37692L])
[2024-01-03T17:41:18.527Z]                +- Union
[2024-01-03T17:41:18.527Z]                   :- Project [key#37547 AS key#37628, value#37633L AS value#37629L]
[2024-01-03T17:41:18.527Z]                   :  +- BroadcastHashJoin [key#37547], [key#37549], Inner, BuildRight, false
[2024-01-03T17:41:18.527Z]                   :     :- HashAggregate(keys=[key#37547], functions=[sum(value#37546)], output=[key#37547, value#37633L])
[2024-01-03T17:41:18.527Z]                   :     :  +- Exchange hashpartitioning(key#37547, 4), ENSURE_REQUIREMENTS, [plan_id=111147]
[2024-01-03T17:41:18.527Z]                   :     :     +- HashAggregate(keys=[key#37547], functions=[partial_sum(value#37546)], output=[key#37547, sum#37694L])
[2024-01-03T17:41:18.527Z]                   :     :        +- Project [value#37546, key#37547]
[2024-01-03T17:41:18.527Z]                   :     :           +- Filter (isnotnull(value#37546) AND (value#37546 > 0))
[2024-01-03T17:41:18.527Z]                   :     :              +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_0[value#37546,key#37547,skey#37548] Batched: true, DataFilters: [isnotnull(value#37546), (value#37546 > 0)], Format: ORC, Location: InMemoryFileIndex(50 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-git..., PartitionFilters: [isnotnull(key#37547), dynamicpruningexpression(key#37547 IN dynamicpruning#37689)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
[2024-01-03T17:41:18.527Z]                   :     :                    +- SubqueryAdaptiveBroadcast dynamicpruning#37689, 0, true, Project [key#37549], [key#37549]
[2024-01-03T17:41:18.527Z]                   :     :                       +- AdaptiveSparkPlan isFinalPlan=false
[2024-01-03T17:41:18.527Z]                   :     :                          +- Project [key#37549]
[2024-01-03T17:41:18.527Z]                   :     :                             +- Filter ((((isnotnull(ex_key#37551) AND isnotnull(filter#37553)) AND (ex_key#37551 = 3)) AND (filter#37553 = 1458)) AND isnotnull(key#37549))
[2024-01-03T17:41:18.527Z]                   :     :                                +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_1[key#37549,ex_key#37551,filter#37553] Batched: true, DataFilters: [isnotnull(ex_key#37551), isnotnull(filter#37553), (ex_key#37551 = 3), (filter#37553 = 1458), isn..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.527Z]                   :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=111150]
[2024-01-03T17:41:18.527Z]                   :        +- Project [key#37549]
[2024-01-03T17:41:18.527Z]                   :           +- Filter ((((isnotnull(ex_key#37551) AND isnotnull(filter#37553)) AND (ex_key#37551 = 3)) AND (filter#37553 = 1458)) AND isnotnull(key#37549))
[2024-01-03T17:41:18.527Z]                   :              +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_1[key#37549,ex_key#37551,filter#37553] Batched: true, DataFilters: [isnotnull(ex_key#37551), isnotnull(filter#37553), (ex_key#37551 = 3), (filter#37553 = 1458), isn..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.527Z]                   +- Project [key#37677, value#37680L]
[2024-01-03T17:41:18.527Z]                      +- BroadcastHashJoin [key#37677], [key#37681], Inner, BuildRight, false
[2024-01-03T17:41:18.527Z]                         :- HashAggregate(keys=[key#37677], functions=[sum(value#37676)], output=[key#37677, value#37680L])
[2024-01-03T17:41:18.527Z]                         :  +- Exchange hashpartitioning(key#37677, 4), ENSURE_REQUIREMENTS, [plan_id=111153]
[2024-01-03T17:41:18.527Z]                         :     +- HashAggregate(keys=[key#37677], functions=[partial_sum(value#37676)], output=[key#37677, sum#37696L])
[2024-01-03T17:41:18.527Z]                         :        +- Project [value#37676, key#37677]
[2024-01-03T17:41:18.527Z]                         :           +- Filter (isnotnull(value#37676) AND (value#37676 > 0))
[2024-01-03T17:41:18.527Z]                         :              +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_0[value#37676,key#37677,skey#37678] Batched: true, DataFilters: [isnotnull(value#37676), (value#37676 > 0)], Format: ORC, Location: InMemoryFileIndex(50 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-git..., PartitionFilters: [isnotnull(key#37677), dynamicpruningexpression(key#37677 IN dynamicpruning#37690)], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
[2024-01-03T17:41:18.527Z]                         :                    +- SubqueryAdaptiveBroadcast dynamicpruning#37690, 0, true, Project [key#37681], [key#37681]
[2024-01-03T17:41:18.527Z]                         :                       +- AdaptiveSparkPlan isFinalPlan=false
[2024-01-03T17:41:18.527Z]                         :                          +- Project [key#37681]
[2024-01-03T17:41:18.527Z]                         :                             +- Filter ((((isnotnull(ex_key#37683) AND isnotnull(filter#37685)) AND (ex_key#37683 = 3)) AND (filter#37685 = 1458)) AND isnotnull(key#37681))
[2024-01-03T17:41:18.527Z]                         :                                +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_1[key#37681,ex_key#37683,filter#37685] Batched: true, DataFilters: [isnotnull(ex_key#37683), isnotnull(filter#37685), (ex_key#37683 = 3), (filter#37685 = 1458), isn..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.527Z]                         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=111156]
[2024-01-03T17:41:18.527Z]                            +- Project [key#37681]
[2024-01-03T17:41:18.527Z]                               +- Filter ((((isnotnull(ex_key#37683) AND isnotnull(filter#37685)) AND (ex_key#37683 = 3)) AND (filter#37685 = 1458)) AND isnotnull(key#37681))
[2024-01-03T17:41:18.527Z]                                  +- FileScan orc spark_catalog.default.tmp_table_gw1_1272856583_1[key#37681,ex_key#37683,filter#37685] Batched: true, DataFilters: [isnotnull(ex_key#37683), isnotnull(filter#37685), (ex_key#37683 = 3), (filter#37685 = 1458), isn..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/jenkins/agent/workspace/jenkins-rapids_integration-dev-gith..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,1458), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
[2024-01-03T17:41:18.528Z] 
[2024-01-03T17:41:18.528Z]  at scala.Predef$.assert(Predef.scala:223)
[2024-01-03T17:41:18.528Z]  at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:170)
[2024-01-03T17:41:18.528Z]  at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:175)
[2024-01-03T17:41:18.528Z]  at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback$.assertContains(ExecutionPlanCaptureCallback.scala:76)
[2024-01-03T17:41:18.528Z]  at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains(ExecutionPlanCaptureCallback.scala)
[2024-01-03T17:41:18.528Z]  at sun.reflect.GeneratedMethodAccessor140.invoke(Unknown Source)
[2024-01-03T17:41:18.528Z]  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[2024-01-03T17:41:18.528Z]  at java.lang.reflect.Method.invoke(Method.java:498)
[2024-01-03T17:41:18.528Z]  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
[2024-01-03T17:41:18.528Z]  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
[2024-01-03T17:41:18.528Z]  at py4j.Gateway.invoke(Gateway.java:282)
[2024-01-03T17:41:18.528Z]  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
[2024-01-03T17:41:18.528Z]  at py4j.commands.CallCommand.execute(CallCommand.java:79)
[2024-01-03T17:41:18.528Z]  at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
[2024-01-03T17:41:18.528Z]  at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
[2024-01-03T17:41:18.528Z]  at java.lang.Thread.run(Thread.java:750)
[2024-01-03T17:41:18.528Z] = 2 failed, 20760 passed, 933 skipped, 417 xfailed, 388 xpassed, 9504 warnings in 6656.39s (1:50:56) =
NVnavkumar commented 6 months ago

From CI, this test failure occurred in Spark 3.5.0

NVnavkumar commented 6 months ago

I was able to replicate both failures on Spark 3.2.4, 3.3.3, 3.4.0, and 3.5.0 (all versions of Spark that support AQE + DPP)

NVnavkumar commented 6 months ago

Basically by the plan output here, it looks like this is an AQE optimization that is turning the entire plan into a LocalTableScan

E                   py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.
E                   : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan
E                   AdaptiveSparkPlan isFinalPlan=true
E                   +- == Final Plan ==
E                      LocalTableScan <empty>, [key#2006, max(value)#2017L]
E                   +- == Initial Plan ==
E                   +- == Initial Plan ==
E                      Sort [key#2006 ASC NULLS FIRST, max(value)#2017L ASC NULLS FIRST], true, 0
E                      +- Exchange rangepartitioning(key#2006 ASC NULLS FIRST, max(value)#2017L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=5302]
E                         +- HashAggregate(keys=[key#2006], functions=[max(value#2007L)], output=[key#2006, max(value)#2017L])
E                            +- Exchange hashpartitioning(key#2006, 4), ENSURE_REQUIREMENTS, [plan_id=5299]
E                               +- HashAggregate(keys=[key#2006], functions=[partial_max(value#2007L)], output=[key#2006, max#2023L])
E                                  +- Union
...
NVnavkumar commented 6 months ago

Basically by the plan output here, it looks like this is an AQE optimization that is turning the entire plan into a LocalTableScan


E                   py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.

E                   : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan

E                   AdaptiveSparkPlan isFinalPlan=true

E                   +- == Final Plan ==

E                      LocalTableScan <empty>, [key#2006, max(value)#2017L]

E                   +- == Initial Plan ==

E                   +- == Initial Plan ==

E                      Sort [key#2006 ASC NULLS FIRST, max(value)#2017L ASC NULLS FIRST], true, 0

E                      +- Exchange rangepartitioning(key#2006 ASC NULLS FIRST, max(value)#2017L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=5302]

E                         +- HashAggregate(keys=[key#2006], functions=[max(value#2007L)], output=[key#2006, max(value)#2017L])

E                            +- Exchange hashpartitioning(key#2006, 4), ENSURE_REQUIREMENTS, [plan_id=5299]

E                               +- HashAggregate(keys=[key#2006], functions=[partial_max(value#2007L)], output=[key#2006, max#2023L])

E                                  +- Union

...

I guess it determined via the join that this would return empty

NVnavkumar commented 6 months ago

So basically after some debugging, I think one the subqueries returned an empty result, so that was short-circuited by AQE to return a LocalTableScan <empty>. This happens on both the CPU and GPU, but of course this means that the result did not contain a DynamicPruningExpression, so it looks like the solution here is that we need update the test logic to be something like an either/or capture. Either there is a single LocalTableScanExec or the GPU plan needs to contain DynamicPruningExpression.

jlowe commented 6 months ago

it looks like the solution here is that we need update the test logic to be something like an either/or capture

I'm not sure that's the best fix. The point of this test is to check handling of DPP, and the problem here is that the datagen happened to produce inputs that failed to produce a plan requiring DPP. IMHO a better fix is to update the input data generation to ensure there isn't a degenerate join. If we want to test handling of degenerate joins as well, that should be a separate test that explicitly sets up inputs to produce a degenerate join.

NVnavkumar commented 6 months ago

it looks like the solution here is that we need update the test logic to be something like an either/or capture

I'm not sure that's the best fix. The point of this test is to check handling of DPP, and the problem here is that the datagen happened to produce inputs that failed to produce a plan requiring DPP. IMHO a better fix is to update the input data generation to ensure there isn't a degenerate join. If we want to test handling of degenerate joins as well, that should be a separate test that explicitly sets up inputs to produce a degenerate join.

Makes sense. Will investigate what is producing the empty join

NVnavkumar commented 6 months ago

test_dpp_empty_relation already exists, so I think we just need to prevent the degenerate join in this test

NVnavkumar commented 5 months ago

Test is now failing again:

FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-parquet][DATAGEN_SEED=1707665221, INJECT_OOM, IGNORE_ORDER]
jlowe commented 5 months ago

Saw this fail again on Dataproc nightly run.

[2024-02-22T15:41:19.602Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-parquet][DATAGEN_SEED=1708615902, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
[2024-02-22T15:41:19.602Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-orc][DATAGEN_SEED=1708615902, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
[2024-02-22T15:41:19.602Z] = 2 failed, 116 passed, 11 skipped, 26232 deselected, 9 warnings in 557.14s (0:09:17) =
GaryShen2008 commented 4 months ago

Another failure

[2024-02-29T10:10:57.500Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-parquet][DATAGEN_SEED=1709192431, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...

[2024-02-29T10:10:57.500Z] FAILED ../../src/main/python/dpp_test.py::test_dpp_reuse_broadcast_exchange[true-5-orc][DATAGEN_SEED=1709192431, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
NVnavkumar commented 2 months ago

Considering this is actually a test issue (the test not being able to avoid an empty LocalTableScan) and not an issue with the plugin, lowering the priority