apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.14k stars 416 forks source link

Internal Error:class org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec has column support mismatch #6790

Open wenfang6 opened 1 month ago

wenfang6 commented 1 month ago

Backend

VL (Velox)

Bug description

when spark.sql.adaptive.enabled=true, run tpcds q57.sql failed. error:

java.lang.IllegalStateException: Internal Error class org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec has column support mismatch:
ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] + 1)),false), [id=#2487]
+- ^(17) ProjectExecTransformer [i_category#421, i_brand#417, cc_name#499, sum_sales#10 AS sum_sales#149, rn#528]
   +- ^(17) WindowExecTransformer [rank(d_year#471, d_moy#473) windowspecdefinition(i_category#421, i_brand#417, cc_name#499, d_year#471 ASC NULLS FIRST, d_moy#473 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#528], [i_category#421, i_brand#417, cc_name#499], [d_year#471 ASC NULLS FIRST, d_moy#473 ASC NULLS FIRST]
      +- ^(17) SortExecTransformer [i_category#421 ASC NULLS FIRST, i_brand#417 ASC NULLS FIRST, cc_name#499 ASC NULLS FIRST, d_year#471 ASC NULLS FIRST, d_moy#473 ASC NULLS FIRST], false, 0
         +- ^(17) InputIteratorTransformer[i_category#421, i_brand#417, cc_name#499, d_year#471, d_moy#473, sum_sales#10]
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 21
                  +- ColumnarExchange hashpartitioning(i_category#421, i_brand#417, cc_name#499, 400), ENSURE_REQUIREMENTS, [i_category#421, i_brand#417, cc_name#499, d_year#471, d_moy#473, sum_sales#10], [id=#2080], [id=#2080], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum_sales:DecimalType(17,2)), [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum_sales:DecimalType(17,2))
                     +- VeloxAppendBatches 3276
                        +- ^(14) ProjectExecTransformer [hash(i_category#421, i_brand#417, cc_name#499, 42) AS hash_partition_key#810, i_category#421, i_brand#417, cc_name#499, d_year#471, d_moy#473, MakeDecimal(sum(UnscaledValue(cs_sales_price#451))#132L,17,2) AS sum_sales#10]
                           +- ^(14) HashAggregateTransformer(keys=[i_category#421, i_brand#417, cc_name#499, d_year#471, d_moy#473], functions=[sum(UnscaledValue(cs_sales_price#451))], output=[i_category#421, i_brand#417, cc_name#499, d_year#471, d_moy#473, sum(UnscaledValue(cs_sales_price#451))#132L])
                              +- ^(14) InputIteratorTransformer[i_category#421, i_brand#417, cc_name#499, d_year#471, d_moy#473, sum#652L]
                                 +- ShuffleQueryStage 17
                                    +- ReusedExchange [i_category#421, i_brand#417, cc_name#499, d_year#471, d_moy#473, sum#652L], ColumnarExchange hashpartitioning(i_category#29, i_brand#25, cc_name#107, d_year#79, d_moy#81, 400), ENSURE_REQUIREMENTS, [i_category#29, i_brand#25, cc_name#107, d_year#79, d_moy#81, sum#650L], [id=#1298], [id=#1298], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum:LongType)

    at org.apache.spark.sql.execution.SparkPlan.doExecuteColumnar(SparkPlan.scala:313)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:208)
    at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecuteColumnar(Exchange.scala:61)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:208)
    at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecuteColumnar(QueryStageExec.scala:121)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:208)
    at org.apache.gluten.backendsapi.velox.VeloxSparkPlanExecApi.createBroadcastRelation(VeloxSparkPlanExecApi.scala:626)
    at org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$2(ColumnarBroadcastExchangeExec.scala:77)
    at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
    at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
    at org.apache.spark.sql.execution.ColumnarBroadcastExchangeExec.$anonfun$relationFuture$1(ColumnarBroadcastExchangeExec.scala:65)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Spark version

Spark-3.2.x

Spark configurations

--conf spark.sql.adaptive.enabled=true

System information

No response

Relevant logs

No response

lgbo-ustc commented 1 month ago

May be the same problem

[2024-08-13T02:37:56.671Z]   java.lang.IllegalStateException: Internal Error class org.apache.gluten.execution.ProjectExecTransformer has column support mismatch:
[2024-08-13T02:37:56.671Z] ProjectExecTransformer [c_last_name#14821, c_first_name#14820, d_date#14757, coalesce(c_last_name#14821, ) AS projected_partitioning_value_0#58675, isnull(c_last_name#14821) AS projected_partitioning_value_1#58676, coalesce(c_first_name#14820, ) AS projected_partitioning_value_2#58677, isnull(c_first_name#14820) AS projected_partitioning_value_3#58678, coalesce(d_date#14757, 1970-01-01) AS projected_partitioning_value_4#58679, isnull(d_date#14757) AS projected_partitioning_value_5#58680]
[2024-08-13T02:37:56.672Z] +- HashAggregateTransformer(keys=[c_last_name#14821, c_first_name#14820, d_date#14757], functions=[], isStreamingAgg=false, output=[c_last_name#14821, c_first_name#14820, d_date#14757])
[2024-08-13T02:37:56.672Z]    +- ColumnarExchange hashpartitioning(c_last_name#14821, c_first_name#14820, d_date#14757, 5), ENSURE_REQUIREMENTS, [plan_id=350544], [id=#350544], [OUTPUT] List(c_last_name:StringType, c_first_name:StringType, d_date:DateType), [OUTPUT] List(c_last_name:StringType, c_first_name:StringType, d_date:DateType)
[2024-08-13T02:37:56.672Z]       +- HashAggregateTransformer(keys=[c_last_name#14821, c_first_name#14820, d_date#14757], functions=[], isStreamingAgg=false, output=[c_last_name#14821, c_first_name#14820, d_date#14757])
[2024-08-13T02:37:56.672Z]          +- ProjectExecTransformer [c_last_name#14821, c_first_name#14820, d_date#14757]
[2024-08-13T02:37:56.672Z]             +- CHBroadcastHashJoinExecTransformer [ss_customer_sk#15499], [c_customer_sk#14812], Inner, BuildRight, false
[2024-08-13T02:37:56.672Z]                :- ProjectExecTransformer [ss_customer_sk#15499, d_date#14757]
[2024-08-13T02:37:56.672Z]                :  +- CHBroadcastHashJoinExecTransformer [ss_sold_date_sk#15519], [d_date_sk#14755], Inner, BuildRight, false
[2024-08-13T02:37:56.672Z]                :     :- FilterExecTransformer isnotnull(ss_customer_sk#15499)
[2024-08-13T02:37:56.672Z]                :     :  +- NativeFileScan parquet tpcdsdb.store_sales[ss_customer_sk#15499,ss_sold_date_sk#15519] Batched: true, DataFilters: [isnotnull(ss_customer_sk#15499)], Format: Parquet, Location: InMemoryFileIndex(1823 paths)[file:/data/tpcds-data-sf1-decimal/store_sales/ss_sold_date_sk=24512..., PartitionFilters: [isnotnull(ss_sold_date_sk#15519), dynamicpruningexpression(ss_sold_date_sk#15519 IN dynamicpruni..., PushedFilters: [IsNotNull(ss_customer_sk)], ReadSchema: struct<ss_customer_sk:int>
[2024-08-13T02:37:56.672Z]                :     :        +- ColumnarSubqueryBroadcast dynamicpruning#58574, 0, [d_date_sk#14755], [id=#350042]
[2024-08-13T02:37:56.672Z]                :     :           +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=350041]
[2024-08-13T02:37:56.672Z]                :     :              +- ^(4516) ProjectExecTransformer [d_date_sk#14755, d_date#14757]
[2024-08-13T02:37:56.672Z]                :     :                 +- ^(4516) FilterExecTransformer (((isnotnull(d_month_seq#14758) AND (d_month_seq#14758 >= 1212)) AND (d_month_seq#14758 <= 1223)) AND isnotnull(d_date_sk#14755))
[2024-08-13T02:37:56.672Z]                :     :                    +- ^(4516) NativeFileScan parquet tpcdsdb.date_dim[d_date_sk#14755,d_date#14757,d_month_seq#14758] Batched: true, DataFilters: [isnotnull(d_month_seq#14758), (d_month_seq#14758 >= 1212), (d_month_seq#14758 <= 1223), isnotnul..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data/tpcds-data-sf1-decimal/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int>
[2024-08-13T02:37:56.672Z]                :     +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=350522]
[2024-08-13T02:37:56.672Z]                :        +- ProjectExecTransformer [d_date_sk#14755, d_date#14757]
[2024-08-13T02:37:56.672Z]                :           +- FilterExecTransformer (((isnotnull(d_month_seq#14758) AND (d_month_seq#14758 >= 1212)) AND (d_month_seq#14758 <= 1223)) AND isnotnull(d_date_sk#14755))
[2024-08-13T02:37:56.672Z]                :              +- NativeFileScan parquet tpcdsdb.date_dim[d_date_sk#14755,d_date#14757,d_month_seq#14758] Batched: true, DataFilters: [isnotnull(d_month_seq#14758), (d_month_seq#14758 >= 1212), (d_month_seq#14758 <= 1223), isnotnul..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data/tpcds-data-sf1-decimal/date_dim], PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int>
[2024-08-13T02:37:56.672Z]                +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=350525]
[2024-08-13T02:37:56.672Z]                   +- FilterExecTransformer isnotnull(c_customer_sk#14812)
[2024-08-13T02:37:56.672Z]                      +- NativeFileScan parquet tpcdsdb.customer[c_customer_sk#14812,c_first_name#14820,c_last_name#14821] Batched: true, DataFilters: [isnotnull(c_customer_sk#14812)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data/tpcds-data-sf1-decimal/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk)], ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string>
zhztheplayer commented 1 month ago

@wenfang6 Would you like to share the whole query plan in text? Thanks.

wenfang6 commented 1 month ago

@wenfang6 Would you like to share the whole query plan in text? Thanks. I use spark3.2.1~

WITH v1 AS (
SELECT
i_category,
i_brand,
cc_name,
d_year,
d_moy,
sum(cs_sales_price) sum_sales,
avg(sum(cs_sales_price))
OVER
(PARTITION BY i_category, i_brand, cc_name, d_year)
avg_monthly_sales,
rank()
OVER
(PARTITION BY i_category, i_brand, cc_name
ORDER BY d_year, d_moy) rn
FROM item, catalog_sales, date_dim, call_center
WHERE cs_item_sk = i_item_sk AND
cs_sold_date_sk = d_date_sk AND
cc_call_center_sk = cs_call_center_sk AND
(
d_year = 1999 OR
(d_year = 1999 - 1 AND d_moy = 12) OR
(d_year = 1999 + 1 AND d_moy = 1)
)
GROUP BY i_category, i_brand,
cc_name, d_year, d_moy),
v2 AS (
SELECT
v1.i_category,
v1.i_brand,
v1.cc_name,
v1.d_year,
v1.d_moy,
v1.avg_monthly_sales,
v1.sum_sales,
v1_lag.sum_sales psum,
v1_lead.sum_sales nsum
FROM v1, v1 v1_lag, v1 v1_lead
WHERE v1.i_category = v1_lag.i_category AND
v1.i_category = v1_lead.i_category AND
v1.i_brand = v1_lag.i_brand AND
v1.i_brand = v1_lead.i_brand AND
v1.cc_name = v1_lag.cc_name AND
v1.cc_name = v1_lead.cc_name AND
v1.rn = v1_lag.rn + 1 AND
v1.rn = v1_lead.rn - 1)
SELECT *
FROM v2
WHERE d_year = 1999 AND
avg_monthly_sales > 0 AND
CASE WHEN avg_monthly_sales > 0
THEN abs(sum_sales - avg_monthly_sales) / avg_monthly_sales
ELSE NULL END > 0.1
ORDER BY sum_sales - avg_monthly_sales, 3
LIMIT 100
zhztheplayer commented 1 month ago

@wenfang6 Would you like to share the whole query plan in text? Thanks. I use spark3.2.1~

WITH v1 AS (
  SELECT
    i_category,
    i_brand,
    cc_name,
    d_year,
    d_moy,
    sum(cs_sales_price) sum_sales,
    avg(sum(cs_sales_price))
    OVER
    (PARTITION BY i_category, i_brand, cc_name, d_year)
    avg_monthly_sales,
    rank()
    OVER
    (PARTITION BY i_category, i_brand, cc_name
      ORDER BY d_year, d_moy) rn
  FROM item, catalog_sales, date_dim, call_center
  WHERE cs_item_sk = i_item_sk AND
    cs_sold_date_sk = d_date_sk AND
    cc_call_center_sk = cs_call_center_sk AND
    (
      d_year = 1999 OR
        (d_year = 1999 - 1 AND d_moy = 12) OR
        (d_year = 1999 + 1 AND d_moy = 1)
    )
  GROUP BY i_category, i_brand,
    cc_name, d_year, d_moy),
    v2 AS (
    SELECT
      v1.i_category,
      v1.i_brand,
      v1.cc_name,
      v1.d_year,
      v1.d_moy,
      v1.avg_monthly_sales,
      v1.sum_sales,
      v1_lag.sum_sales psum,
      v1_lead.sum_sales nsum
    FROM v1, v1 v1_lag, v1 v1_lead
    WHERE v1.i_category = v1_lag.i_category AND
      v1.i_category = v1_lead.i_category AND
      v1.i_brand = v1_lag.i_brand AND
      v1.i_brand = v1_lead.i_brand AND
      v1.cc_name = v1_lag.cc_name AND
      v1.cc_name = v1_lead.cc_name AND
      v1.rn = v1_lag.rn + 1 AND
      v1.rn = v1_lead.rn - 1)
SELECT *
FROM v2
WHERE d_year = 1999 AND
  avg_monthly_sales > 0 AND
  CASE WHEN avg_monthly_sales > 0
    THEN abs(sum_sales - avg_monthly_sales) / avg_monthly_sales
  ELSE NULL END > 0.1
ORDER BY sum_sales - avg_monthly_sales, 3
LIMIT 100

Thanks @wenfang6 . But I may not made it clearer that, would you like to do an explain on the SQL then paste the output here? It's more helpful if we can analyze on Spark query plan than on SQL itself.

wenfang6 commented 1 month ago

@wenfang6 Would you like to share the whole query plan in text? Thanks. I use spark3.2.1~

WITH v1 AS (
  SELECT
    i_category,
    i_brand,
    cc_name,
    d_year,
    d_moy,
    sum(cs_sales_price) sum_sales,
    avg(sum(cs_sales_price))
    OVER
    (PARTITION BY i_category, i_brand, cc_name, d_year)
    avg_monthly_sales,
    rank()
    OVER
    (PARTITION BY i_category, i_brand, cc_name
      ORDER BY d_year, d_moy) rn
  FROM item, catalog_sales, date_dim, call_center
  WHERE cs_item_sk = i_item_sk AND
    cs_sold_date_sk = d_date_sk AND
    cc_call_center_sk = cs_call_center_sk AND
    (
      d_year = 1999 OR
        (d_year = 1999 - 1 AND d_moy = 12) OR
        (d_year = 1999 + 1 AND d_moy = 1)
    )
  GROUP BY i_category, i_brand,
    cc_name, d_year, d_moy),
    v2 AS (
    SELECT
      v1.i_category,
      v1.i_brand,
      v1.cc_name,
      v1.d_year,
      v1.d_moy,
      v1.avg_monthly_sales,
      v1.sum_sales,
      v1_lag.sum_sales psum,
      v1_lead.sum_sales nsum
    FROM v1, v1 v1_lag, v1 v1_lead
    WHERE v1.i_category = v1_lag.i_category AND
      v1.i_category = v1_lead.i_category AND
      v1.i_brand = v1_lag.i_brand AND
      v1.i_brand = v1_lead.i_brand AND
      v1.cc_name = v1_lag.cc_name AND
      v1.cc_name = v1_lead.cc_name AND
      v1.rn = v1_lag.rn + 1 AND
      v1.rn = v1_lead.rn - 1)
SELECT *
FROM v2
WHERE d_year = 1999 AND
  avg_monthly_sales > 0 AND
  CASE WHEN avg_monthly_sales > 0
    THEN abs(sum_sales - avg_monthly_sales) / avg_monthly_sales
  ELSE NULL END > 0.1
ORDER BY sum_sales - avg_monthly_sales, 3
LIMIT 100

Thanks @wenfang6 . But I may not made it clearer that, would you like to do an explain on the SQL then paste the output here? It's more helpful if we can analyze on Spark query plan than on SQL itself.

gluten sql ui show this

== Fallback Summary ==
No fallback nodes

== Physical Plan ==
ColumnarBroadcastExchange (3)
+- BroadcastQueryStage (2)
   +- ReusedExchange (1)

===== Adaptively Optimized Out Exchanges =====

Subplan:1
ColumnarBroadcastExchange (21)
+- ^ ProjectExecTransformer (19)
   +- ^ WindowExecTransformer (18)
      +- ^ SortExecTransformer (17)
         +- ^ InputIteratorTransformer (16)
            +- AQEShuffleRead (14)
               +- ShuffleQueryStage (13)
                  +- ColumnarExchange (12)
                     +- VeloxAppendBatches (11)
                        +- ^ ProjectExecTransformer (9)
                           +- ^ RegularHashAggregateExecTransformer (8)
                              +- ^ InputIteratorTransformer (7)
                                 +- ShuffleQueryStage (5)
                                    +- ReusedExchange (4)

explain sql :

== Parsed Logical Plan ==
CTE [v1, v2]
:  :- 'SubqueryAlias v1
:  :  +- 'Aggregate ['i_category, 'i_brand, 'cc_name, 'd_year, 'd_moy], ['i_category, 'i_brand, 'cc_name, 'd_year, 'd_moy, 'sum('cs_sales_price) AS sum_sales#10, 'avg('sum('cs_sales_price)) windowspecdefinition('i_category, 'i_brand, 'cc_name, 'd_year, unspecifiedframe$()) AS avg_monthly_sales#11, 'rank() windowspecdefinition('i_category, 'i_brand, 'cc_name, 'd_year ASC NULLS FIRST, 'd_moy ASC NULLS FIRST, unspecifiedframe$()) AS rn#12]
:  :     +- 'Filter ((('cs_item_sk = 'i_item_sk) AND ('cs_sold_date_sk = 'd_date_sk)) AND (('cc_call_center_sk = 'cs_call_center_sk) AND ((('d_year = 1999) OR (('d_year = (1999 - 1)) AND ('d_moy = 12))) OR (('d_year = (1999 + 1)) AND ('d_moy = 1)))))
:  :        +- 'Join Inner
:  :           :- 'Join Inner
:  :           :  :- 'Join Inner
:  :           :  :  :- 'UnresolvedRelation [item], [], false
:  :           :  :  +- 'UnresolvedRelation [catalog_sales], [], false
:  :           :  +- 'UnresolvedRelation [date_dim], [], false
:  :           +- 'UnresolvedRelation [call_center], [], false
:  +- 'SubqueryAlias v2
:     +- 'Project ['v1.i_category, 'v1.i_brand, 'v1.cc_name, 'v1.d_year, 'v1.d_moy, 'v1.avg_monthly_sales, 'v1.sum_sales, 'v1_lag.sum_sales AS psum#13, 'v1_lead.sum_sales AS nsum#14]
:        +- 'Filter (((('v1.i_category = 'v1_lag.i_category) AND ('v1.i_category = 'v1_lead.i_category)) AND (('v1.i_brand = 'v1_lag.i_brand) AND ('v1.i_brand = 'v1_lead.i_brand))) AND ((('v1.cc_name = 'v1_lag.cc_name) AND ('v1.cc_name = 'v1_lead.cc_name)) AND (('v1.rn = ('v1_lag.rn + 1)) AND ('v1.rn = ('v1_lead.rn - 1)))))
:           +- 'Join Inner
:              :- 'Join Inner
:              :  :- 'UnresolvedRelation [v1], [], false
:              :  +- 'SubqueryAlias v1_lag
:              :     +- 'UnresolvedRelation [v1], [], false
:              +- 'SubqueryAlias v1_lead
:                 +- 'UnresolvedRelation [v1], [], false
+- 'GlobalLimit 100
   +- 'LocalLimit 100
      +- 'Sort [('sum_sales - 'avg_monthly_sales) ASC NULLS FIRST, 3 ASC NULLS FIRST], true
         +- 'Project [*]
            +- 'Filter ((('d_year = 1999) AND ('avg_monthly_sales > 0)) AND (CASE WHEN ('avg_monthly_sales > 0) THEN ('abs(('sum_sales - 'avg_monthly_sales)) / 'avg_monthly_sales) ELSE null END > 0.1))
               +- 'UnresolvedRelation [v2], [], false

== Analyzed Logical Plan ==
i_category: string, i_brand: string, cc_name: string, d_year: int, d_moy: int, avg_monthly_sales: decimal(21,6), sum_sales: decimal(17,2), psum: decimal(17,2), nsum: decimal(17,2)
WithCTE
:- CTERelationDef 0
:  +- SubqueryAlias v1
:     +- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum_sales#10, avg_monthly_sales#11, rn#12]
:        +- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum_sales#10, _w0#148, avg_monthly_sales#11, rn#12, avg_monthly_sales#11, rn#12]
:           +- Window [rank(d_year#84, d_moy#86) windowspecdefinition(i_category#34, i_brand#30, cc_name#112, d_year#84 ASC NULLS FIRST, d_moy#86 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#12], [i_category#34, i_brand#30, cc_name#112], [d_year#84 ASC NULLS FIRST, d_moy#86 ASC NULLS FIRST]
:              +- Window [avg(_w0#148) windowspecdefinition(i_category#34, i_brand#30, cc_name#112, d_year#84, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg_monthly_sales#11], [i_category#34, i_brand#30, cc_name#112, d_year#84]
:                 +- Aggregate [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86], [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum(cs_sales_price#64) AS sum_sales#10, sum(cs_sales_price#64) AS _w0#148]
:                    +- Filter (((cs_item_sk#58L = i_item_sk#22L) AND (cs_sold_date_sk#77L = d_date_sk#78L)) AND ((cc_call_center_sk#106L = cs_call_center_sk#54L) AND (((d_year#84 = 1999) OR ((d_year#84 = (1999 - 1)) AND (d_moy#86 = 12))) OR ((d_year#84 = (1999 + 1)) AND (d_moy#86 = 1)))))
:                       +- Join Inner
:                          :- Join Inner
:                          :  :- Join Inner
:                          :  :  :- SubqueryAlias spark_catalog.dwd_insys_test.item
:                          :  :  :  +- HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#22L, i_item_id#23, i_rec_start_date#24, i_rec_end_date#25, i_item_desc#26, i_current_p..., Partition Cols: []]
:                          :  :  +- SubqueryAlias spark_catalog.dwd_insys_test.catalog_sales
:                          :  :     +- HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#44L, cs_ship_date_sk#45L, cs_bill_customer_sk#46L, cs_bill_cdemo_sk#47L, cs_bill..., Partition Cols: [cs_sold_date_sk#77L]]
:                          :  +- SubqueryAlias spark_catalog.dwd_insys_test.date_dim
:                          :     +- HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#78L, d_date_id#79, d_date#80, d_month_seq#81, d_week_seq#82, d_quarter_seq#83, d_year#..., Partition Cols: []]
:                          +- SubqueryAlias spark_catalog.dwd_insys_test.call_center
:                             +- HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#106L, cc_call_center_id#107, cc_rec_start_date#108, cc_rec_end_date#109, cc_cl..., Partition Cols: []]
:- CTERelationDef 1
:  +- SubqueryAlias v2
:     +- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, avg_monthly_sales#11, sum_sales#10, sum_sales#154 AS psum#13, sum_sales#162 AS nsum#14]
:        +- Filter ((((i_category#34 = i_category#149) AND (i_category#34 = i_category#157)) AND ((i_brand#30 = i_brand#150) AND (i_brand#30 = i_brand#158))) AND (((cc_name#112 = cc_name#151) AND (cc_name#112 = cc_name#159)) AND ((rn#12 = (rn#156 + 1)) AND (rn#12 = (rn#164 - 1)))))
:           +- Join Inner
:              :- Join Inner
:              :  :- SubqueryAlias v1
:              :  :  +- CTERelationRef 0, true, [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum_sales#10, avg_monthly_sales#11, rn#12]
:              :  +- SubqueryAlias v1_lag
:              :     +- SubqueryAlias v1
:              :        +- CTERelationRef 0, true, [i_category#149, i_brand#150, cc_name#151, d_year#152, d_moy#153, sum_sales#154, avg_monthly_sales#155, rn#156]
:              +- SubqueryAlias v1_lead
:                 +- SubqueryAlias v1
:                    +- CTERelationRef 0, true, [i_category#157, i_brand#158, cc_name#159, d_year#160, d_moy#161, sum_sales#162, avg_monthly_sales#163, rn#164]
+- GlobalLimit 100
   +- LocalLimit 100
      +- Sort [CheckOverflow((promote_precision(cast(sum_sales#10 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(22,6), true) ASC NULLS FIRST, cc_name#112 ASC NULLS FIRST], true
         +- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, avg_monthly_sales#11, sum_sales#10, psum#13, nsum#14]
            +- Filter (((d_year#84 = 1999) AND (cast(avg_monthly_sales#11 as decimal(21,6)) > cast(cast(0 as decimal(1,0)) as decimal(21,6)))) AND (cast(CASE WHEN (cast(avg_monthly_sales#11 as decimal(21,6)) > cast(cast(0 as decimal(1,0)) as decimal(21,6))) THEN CheckOverflow((promote_precision(cast(abs(CheckOverflow((promote_precision(cast(sum_sales#10 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(22,6), true), false) as decimal(22,6))) / promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(38,16), true) ELSE cast(null as decimal(38,16)) END as decimal(38,16)) > cast(0.1 as decimal(38,16))))
               +- SubqueryAlias v2
                  +- CTERelationRef 1, true, [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, avg_monthly_sales#11, sum_sales#10, psum#13, nsum#14]

== Optimized Logical Plan ==
GlobalLimit 100
+- LocalLimit 100
   +- Sort [CheckOverflow((promote_precision(cast(sum_sales#10 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(22,6), true) ASC NULLS FIRST, cc_name#112 ASC NULLS FIRST], true
      +- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, avg_monthly_sales#11, sum_sales#10, sum_sales#154 AS psum#13, sum_sales#162 AS nsum#14]
         +- Join Inner, ((((i_category#34 = i_category#537) AND (i_brand#30 = i_brand#533)) AND (cc_name#112 = cc_name#615)) AND (rn#12 = (rn#644 - 1)))
            :- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum_sales#10, avg_monthly_sales#11, rn#12, sum_sales#154]
            :  +- Join Inner, ((((i_category#34 = i_category#417) AND (i_brand#30 = i_brand#413)) AND (cc_name#112 = cc_name#495)) AND (rn#12 = (rn#524 + 1)))
            :     :- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum_sales#10, avg_monthly_sales#11, rn#12]
            :     :  +- Filter ((isnotnull(avg_monthly_sales#11) AND (avg_monthly_sales#11 > 0.000000)) AND (CheckOverflow((promote_precision(abs(CheckOverflow((promote_precision(cast(sum_sales#10 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(22,6), true), false)) / promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(38,16), true) > 0.1000000000000000))
            :     :     +- Window [avg(_w0#148) windowspecdefinition(i_category#34, i_brand#30, cc_name#112, d_year#84, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg_monthly_sales#11], [i_category#34, i_brand#30, cc_name#112, d_year#84]
            :     :        +- Filter (isnotnull(d_year#84) AND (d_year#84 = 1999))
            :     :           +- Window [rank(d_year#84, d_moy#86) windowspecdefinition(i_category#34, i_brand#30, cc_name#112, d_year#84 ASC NULLS FIRST, d_moy#86 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#12], [i_category#34, i_brand#30, cc_name#112], [d_year#84 ASC NULLS FIRST, d_moy#86 ASC NULLS FIRST]
            :     :              +- Aggregate [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86], [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, MakeDecimal(sum(UnscaledValue(cs_sales_price#64)),17,2) AS sum_sales#10, MakeDecimal(sum(UnscaledValue(cs_sales_price#64)),17,2) AS _w0#148]
            :     :                 +- Project [i_brand#30, i_category#34, cs_sales_price#64, d_year#84, d_moy#86, cc_name#112]
            :     :                    +- Join Inner, (cc_call_center_sk#106L = cs_call_center_sk#54L)
            :     :                       :- Project [i_brand#30, i_category#34, cs_call_center_sk#54L, cs_sales_price#64, d_year#84, d_moy#86]
            :     :                       :  +- Join Inner, (cs_sold_date_sk#77L = d_date_sk#78L)
            :     :                       :     :- Project [i_brand#30, i_category#34, cs_call_center_sk#54L, cs_sales_price#64, cs_sold_date_sk#77L]
            :     :                       :     :  +- Join Inner, (cs_item_sk#58L = i_item_sk#22L)
            :     :                       :     :     :- Project [i_item_sk#22L, i_brand#30, i_category#34]
            :     :                       :     :     :  +- Filter (isnotnull(i_item_sk#22L) AND (isnotnull(i_category#34) AND isnotnull(i_brand#30)))
            :     :                       :     :     :     +- HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#22L, i_item_id#23, i_rec_start_date#24, i_rec_end_date#25, i_item_desc#26, i_current_p..., Partition Cols: []]
            :     :                       :     :     +- Project [cs_call_center_sk#54L, cs_item_sk#58L, cs_sales_price#64, cs_sold_date_sk#77L]
            :     :                       :     :        +- Filter ((isnotnull(cs_item_sk#58L) AND isnotnull(cs_sold_date_sk#77L)) AND isnotnull(cs_call_center_sk#54L))
            :     :                       :     :           +- HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#44L, cs_ship_date_sk#45L, cs_bill_customer_sk#46L, cs_bill_cdemo_sk#47L, cs_bill..., Partition Cols: [cs_sold_date_sk#77L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...]
            :     :                       :     +- Project [d_date_sk#78L, d_year#84, d_moy#86]
            :     :                       :        +- Filter ((((d_year#84 = 1999) OR ((d_year#84 = 1998) AND (d_moy#86 = 12))) OR ((d_year#84 = 2000) AND (d_moy#86 = 1))) AND isnotnull(d_date_sk#78L))
            :     :                       :           +- HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#78L, d_date_id#79, d_date#80, d_month_seq#81, d_week_seq#82, d_quarter_seq#83, d_year#..., Partition Cols: []]
            :     :                       +- Project [cc_call_center_sk#106L, cc_name#112]
            :     :                          +- Filter (isnotnull(cc_call_center_sk#106L) AND isnotnull(cc_name#112))
            :     :                             +- HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#106L, cc_call_center_id#107, cc_rec_start_date#108, cc_rec_end_date#109, cc_cl..., Partition Cols: []]
            :     +- Project [i_category#417, i_brand#413, cc_name#495, sum_sales#10 AS sum_sales#154, rn#524]
            :        +- Window [rank(d_year#467, d_moy#469) windowspecdefinition(i_category#417, i_brand#413, cc_name#495, d_year#467 ASC NULLS FIRST, d_moy#469 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#524], [i_category#417, i_brand#413, cc_name#495], [d_year#467 ASC NULLS FIRST, d_moy#469 ASC NULLS FIRST]
            :           +- Aggregate [i_category#417, i_brand#413, cc_name#495, d_year#467, d_moy#469], [i_category#417, i_brand#413, cc_name#495, d_year#467, d_moy#469, MakeDecimal(sum(UnscaledValue(cs_sales_price#447)),17,2) AS sum_sales#10]
            :              +- Project [i_brand#413, i_category#417, cs_sales_price#447, d_year#467, d_moy#469, cc_name#495]
            :                 +- Join Inner, (cc_call_center_sk#489L = cs_call_center_sk#437L)
            :                    :- Project [i_brand#413, i_category#417, cs_call_center_sk#437L, cs_sales_price#447, d_year#467, d_moy#469]
            :                    :  +- Join Inner, (cs_sold_date_sk#460L = d_date_sk#461L)
            :                    :     :- Project [i_brand#413, i_category#417, cs_call_center_sk#437L, cs_sales_price#447, cs_sold_date_sk#460L]
            :                    :     :  +- Join Inner, (cs_item_sk#441L = i_item_sk#405L)
            :                    :     :     :- Project [i_item_sk#405L, i_brand#413, i_category#417]
            :                    :     :     :  +- Filter (isnotnull(i_item_sk#405L) AND (isnotnull(i_category#417) AND isnotnull(i_brand#413)))
            :                    :     :     :     +- HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#405L, i_item_id#406, i_rec_start_date#407, i_rec_end_date#408, i_item_desc#409, i_curr..., Partition Cols: []]
            :                    :     :     +- Project [cs_call_center_sk#437L, cs_item_sk#441L, cs_sales_price#447, cs_sold_date_sk#460L]
            :                    :     :        +- Filter ((isnotnull(cs_item_sk#441L) AND isnotnull(cs_sold_date_sk#460L)) AND isnotnull(cs_call_center_sk#437L))
            :                    :     :           +- HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#427L, cs_ship_date_sk#428L, cs_bill_customer_sk#429L, cs_bill_cdemo_sk#430L, cs_..., Partition Cols: [cs_sold_date_sk#460L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...]
            :                    :     +- Project [d_date_sk#461L, d_year#467, d_moy#469]
            :                    :        +- Filter ((((d_year#467 = 1999) OR ((d_year#467 = 1998) AND (d_moy#469 = 12))) OR ((d_year#467 = 2000) AND (d_moy#469 = 1))) AND isnotnull(d_date_sk#461L))
            :                    :           +- HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#461L, d_date_id#462, d_date#463, d_month_seq#464, d_week_seq#465, d_quarter_seq#466, d..., Partition Cols: []]
            :                    +- Project [cc_call_center_sk#489L, cc_name#495]
            :                       +- Filter (isnotnull(cc_call_center_sk#489L) AND isnotnull(cc_name#495))
            :                          +- HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#489L, cc_call_center_id#490, cc_rec_start_date#491, cc_rec_end_date#492, cc_cl..., Partition Cols: []]
            +- Project [i_category#537, i_brand#533, cc_name#615, sum_sales#10 AS sum_sales#162, rn#644]
               +- Window [rank(d_year#587, d_moy#589) windowspecdefinition(i_category#537, i_brand#533, cc_name#615, d_year#587 ASC NULLS FIRST, d_moy#589 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#644], [i_category#537, i_brand#533, cc_name#615], [d_year#587 ASC NULLS FIRST, d_moy#589 ASC NULLS FIRST]
                  +- Aggregate [i_category#537, i_brand#533, cc_name#615, d_year#587, d_moy#589], [i_category#537, i_brand#533, cc_name#615, d_year#587, d_moy#589, MakeDecimal(sum(UnscaledValue(cs_sales_price#567)),17,2) AS sum_sales#10]
                     +- Project [i_brand#533, i_category#537, cs_sales_price#567, d_year#587, d_moy#589, cc_name#615]
                        +- Join Inner, (cc_call_center_sk#609L = cs_call_center_sk#557L)
                           :- Project [i_brand#533, i_category#537, cs_call_center_sk#557L, cs_sales_price#567, d_year#587, d_moy#589]
                           :  +- Join Inner, (cs_sold_date_sk#580L = d_date_sk#581L)
                           :     :- Project [i_brand#533, i_category#537, cs_call_center_sk#557L, cs_sales_price#567, cs_sold_date_sk#580L]
                           :     :  +- Join Inner, (cs_item_sk#561L = i_item_sk#525L)
                           :     :     :- Project [i_item_sk#525L, i_brand#533, i_category#537]
                           :     :     :  +- Filter (isnotnull(i_item_sk#525L) AND (isnotnull(i_category#537) AND isnotnull(i_brand#533)))
                           :     :     :     +- HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#525L, i_item_id#526, i_rec_start_date#527, i_rec_end_date#528, i_item_desc#529, i_curr..., Partition Cols: []]
                           :     :     +- Project [cs_call_center_sk#557L, cs_item_sk#561L, cs_sales_price#567, cs_sold_date_sk#580L]
                           :     :        +- Filter ((isnotnull(cs_item_sk#561L) AND isnotnull(cs_sold_date_sk#580L)) AND isnotnull(cs_call_center_sk#557L))
                           :     :           +- HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#547L, cs_ship_date_sk#548L, cs_bill_customer_sk#549L, cs_bill_cdemo_sk#550L, cs_..., Partition Cols: [cs_sold_date_sk#580L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...]
                           :     +- Project [d_date_sk#581L, d_year#587, d_moy#589]
                           :        +- Filter ((((d_year#587 = 1999) OR ((d_year#587 = 1998) AND (d_moy#589 = 12))) OR ((d_year#587 = 2000) AND (d_moy#589 = 1))) AND isnotnull(d_date_sk#581L))
                           :           +- HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#581L, d_date_id#582, d_date#583, d_month_seq#584, d_week_seq#585, d_quarter_seq#586, d..., Partition Cols: []]
                           +- Project [cc_call_center_sk#609L, cc_name#615]
                              +- Filter (isnotnull(cc_call_center_sk#609L) AND isnotnull(cc_name#615))
                                 +- HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#609L, cc_call_center_id#610, cc_rec_start_date#611, cc_rec_end_date#612, cc_cl..., Partition Cols: []]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=100, orderBy=[CheckOverflow((promote_precision(cast(sum_sales#10 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(22,6), true) ASC NULLS FIRST,cc_name#112 ASC NULLS FIRST], output=[i_category#34,i_brand#30,cc_name#112,d_year#84,d_moy#86,avg_monthly_sales#11,sum_sales#10,psum#13,nsum#14])
   +- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, avg_monthly_sales#11, sum_sales#10, sum_sales#154 AS psum#13, sum_sales#162 AS nsum#14]
      +- ShuffledHashJoin [i_category#34, i_brand#30, cc_name#112, rn#12], [i_category#537, i_brand#533, cc_name#615, (rn#644 - 1)], Inner, BuildRight
         :- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum_sales#10, avg_monthly_sales#11, rn#12, sum_sales#154]
         :  +- ShuffledHashJoin [i_category#34, i_brand#30, cc_name#112, rn#12], [i_category#417, i_brand#413, cc_name#495, (rn#524 + 1)], Inner, BuildRight
         :     :- Exchange hashpartitioning(i_category#34, i_brand#30, cc_name#112, rn#12, 200), ENSURE_REQUIREMENTS, [id=#285]
         :     :  +- Project [i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum_sales#10, avg_monthly_sales#11, rn#12]
         :     :     +- Filter ((isnotnull(avg_monthly_sales#11) AND (avg_monthly_sales#11 > 0.000000)) AND (CheckOverflow((promote_precision(abs(CheckOverflow((promote_precision(cast(sum_sales#10 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(22,6), true), false)) / promote_precision(cast(avg_monthly_sales#11 as decimal(22,6)))), DecimalType(38,16), true) > 0.1000000000000000))
         :     :        +- Window [avg(_w0#148) windowspecdefinition(i_category#34, i_brand#30, cc_name#112, d_year#84, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg_monthly_sales#11], [i_category#34, i_brand#30, cc_name#112, d_year#84]
         :     :           +- Filter (isnotnull(d_year#84) AND (d_year#84 = 1999))
         :     :              +- Window [rank(d_year#84, d_moy#86) windowspecdefinition(i_category#34, i_brand#30, cc_name#112, d_year#84 ASC NULLS FIRST, d_moy#86 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#12], [i_category#34, i_brand#30, cc_name#112], [d_year#84 ASC NULLS FIRST, d_moy#86 ASC NULLS FIRST]
         :     :                 +- Sort [i_category#34 ASC NULLS FIRST, i_brand#30 ASC NULLS FIRST, cc_name#112 ASC NULLS FIRST, d_year#84 ASC NULLS FIRST, d_moy#86 ASC NULLS FIRST], false, 0
         :     :                    +- Exchange hashpartitioning(i_category#34, i_brand#30, cc_name#112, 200), ENSURE_REQUIREMENTS, [id=#256]
         :     :                       +- HashAggregate(keys=[i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86], functions=[sum(UnscaledValue(cs_sales_price#64))], output=[i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum_sales#10, _w0#148])
         :     :                          +- Exchange hashpartitioning(i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, 200), ENSURE_REQUIREMENTS, [id=#253]
         :     :                             +- HashAggregate(keys=[i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86], functions=[partial_sum(UnscaledValue(cs_sales_price#64))], output=[i_category#34, i_brand#30, cc_name#112, d_year#84, d_moy#86, sum#646L])
         :     :                                +- Project [i_brand#30, i_category#34, cs_sales_price#64, d_year#84, d_moy#86, cc_name#112]
         :     :                                   +- BroadcastHashJoin [cs_call_center_sk#54L], [cc_call_center_sk#106L], Inner, BuildRight, false
         :     :                                      :- Project [i_brand#30, i_category#34, cs_call_center_sk#54L, cs_sales_price#64, d_year#84, d_moy#86]
         :     :                                      :  +- BroadcastHashJoin [cs_sold_date_sk#77L], [d_date_sk#78L], Inner, BuildRight, false
         :     :                                      :     :- Project [i_brand#30, i_category#34, cs_call_center_sk#54L, cs_sales_price#64, cs_sold_date_sk#77L]
         :     :                                      :     :  +- BroadcastHashJoin [i_item_sk#22L], [cs_item_sk#58L], Inner, BuildLeft, false
         :     :                                      :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#240]
         :     :                                      :     :     :  +- Filter ((isnotnull(i_item_sk#22L) AND isnotnull(i_category#34)) AND isnotnull(i_brand#30))
         :     :                                      :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#22L, i_brand#30, i_category#34], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#22L, i_item_id#23, i_rec_start_date#24, i_rec_end_date#25, i_item_desc#26, i_current_p..., Partition Cols: []]
         :     :                                      :     :     +- Filter (isnotnull(cs_item_sk#58L) AND isnotnull(cs_call_center_sk#54L))
         :     :                                      :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#54L, cs_item_sk#58L, cs_sales_price#64, cs_sold_date_sk#77L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#44L, cs_ship_date_sk#45L, cs_bill_customer_sk#46L, cs_bill_cdemo_sk#47L, cs_bill..., Partition Cols: [cs_sold_date_sk#77L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#77L)]
         :     :                                      :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#244]
         :     :                                      :        +- Filter ((((d_year#84 = 1999) OR ((d_year#84 = 1998) AND (d_moy#86 = 12))) OR ((d_year#84 = 2000) AND (d_moy#86 = 1))) AND isnotnull(d_date_sk#78L))
         :     :                                      :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#78L, d_year#84, d_moy#86], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#78L, d_date_id#79, d_date#80, d_month_seq#81, d_week_seq#82, d_quarter_seq#83, d_year#..., Partition Cols: []]
         :     :                                      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#248]
         :     :                                         +- Filter (isnotnull(cc_call_center_sk#106L) AND isnotnull(cc_name#112))
         :     :                                            +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#106L, cc_name#112], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#106L, cc_call_center_id#107, cc_rec_start_date#108, cc_rec_end_date#109, cc_cl..., Partition Cols: []]
         :     +- Exchange hashpartitioning(i_category#417, i_brand#413, cc_name#495, (rn#524 + 1), 200), ENSURE_REQUIREMENTS, [id=#286]
         :        +- Project [i_category#417, i_brand#413, cc_name#495, sum_sales#10 AS sum_sales#154, rn#524]
         :           +- Window [rank(d_year#467, d_moy#469) windowspecdefinition(i_category#417, i_brand#413, cc_name#495, d_year#467 ASC NULLS FIRST, d_moy#469 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#524], [i_category#417, i_brand#413, cc_name#495], [d_year#467 ASC NULLS FIRST, d_moy#469 ASC NULLS FIRST]
         :              +- Sort [i_category#417 ASC NULLS FIRST, i_brand#413 ASC NULLS FIRST, cc_name#495 ASC NULLS FIRST, d_year#467 ASC NULLS FIRST, d_moy#469 ASC NULLS FIRST], false, 0
         :                 +- Exchange hashpartitioning(i_category#417, i_brand#413, cc_name#495, 200), ENSURE_REQUIREMENTS, [id=#279]
         :                    +- HashAggregate(keys=[i_category#417, i_brand#413, cc_name#495, d_year#467, d_moy#469], functions=[sum(UnscaledValue(cs_sales_price#447))], output=[i_category#417, i_brand#413, cc_name#495, d_year#467, d_moy#469, sum_sales#10])
         :                       +- Exchange hashpartitioning(i_category#417, i_brand#413, cc_name#495, d_year#467, d_moy#469, 200), ENSURE_REQUIREMENTS, [id=#276]
         :                          +- HashAggregate(keys=[i_category#417, i_brand#413, cc_name#495, d_year#467, d_moy#469], functions=[partial_sum(UnscaledValue(cs_sales_price#447))], output=[i_category#417, i_brand#413, cc_name#495, d_year#467, d_moy#469, sum#648L])
         :                             +- Project [i_brand#413, i_category#417, cs_sales_price#447, d_year#467, d_moy#469, cc_name#495]
         :                                +- BroadcastHashJoin [cs_call_center_sk#437L], [cc_call_center_sk#489L], Inner, BuildRight, false
         :                                   :- Project [i_brand#413, i_category#417, cs_call_center_sk#437L, cs_sales_price#447, d_year#467, d_moy#469]
         :                                   :  +- BroadcastHashJoin [cs_sold_date_sk#460L], [d_date_sk#461L], Inner, BuildRight, false
         :                                   :     :- Project [i_brand#413, i_category#417, cs_call_center_sk#437L, cs_sales_price#447, cs_sold_date_sk#460L]
         :                                   :     :  +- BroadcastHashJoin [i_item_sk#405L], [cs_item_sk#441L], Inner, BuildLeft, false
         :                                   :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#263]
         :                                   :     :     :  +- Filter ((isnotnull(i_item_sk#405L) AND isnotnull(i_category#417)) AND isnotnull(i_brand#413))
         :                                   :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#405L, i_brand#413, i_category#417], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#405L, i_item_id#406, i_rec_start_date#407, i_rec_end_date#408, i_item_desc#409, i_curr..., Partition Cols: []]
         :                                   :     :     +- Filter (isnotnull(cs_item_sk#441L) AND isnotnull(cs_call_center_sk#437L))
         :                                   :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#437L, cs_item_sk#441L, cs_sales_price#447, cs_sold_date_sk#460L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#427L, cs_ship_date_sk#428L, cs_bill_customer_sk#429L, cs_bill_cdemo_sk#430L, cs_..., Partition Cols: [cs_sold_date_sk#460L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#460L)]
         :                                   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#267]
         :                                   :        +- Filter ((((d_year#467 = 1999) OR ((d_year#467 = 1998) AND (d_moy#469 = 12))) OR ((d_year#467 = 2000) AND (d_moy#469 = 1))) AND isnotnull(d_date_sk#461L))
         :                                   :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#461L, d_year#467, d_moy#469], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#461L, d_date_id#462, d_date#463, d_month_seq#464, d_week_seq#465, d_quarter_seq#466, d..., Partition Cols: []]
         :                                   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#271]
         :                                      +- Filter (isnotnull(cc_call_center_sk#489L) AND isnotnull(cc_name#495))
         :                                         +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#489L, cc_name#495], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#489L, cc_call_center_id#490, cc_rec_start_date#491, cc_rec_end_date#492, cc_cl..., Partition Cols: []]
         +- Exchange hashpartitioning(i_category#537, i_brand#533, cc_name#615, (rn#644 - 1), 200), ENSURE_REQUIREMENTS, [id=#311]
            +- Project [i_category#537, i_brand#533, cc_name#615, sum_sales#10 AS sum_sales#162, rn#644]
               +- Window [rank(d_year#587, d_moy#589) windowspecdefinition(i_category#537, i_brand#533, cc_name#615, d_year#587 ASC NULLS FIRST, d_moy#589 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#644], [i_category#537, i_brand#533, cc_name#615], [d_year#587 ASC NULLS FIRST, d_moy#589 ASC NULLS FIRST]
                  +- Sort [i_category#537 ASC NULLS FIRST, i_brand#533 ASC NULLS FIRST, cc_name#615 ASC NULLS FIRST, d_year#587 ASC NULLS FIRST, d_moy#589 ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(i_category#537, i_brand#533, cc_name#615, 200), ENSURE_REQUIREMENTS, [id=#305]
                        +- HashAggregate(keys=[i_category#537, i_brand#533, cc_name#615, d_year#587, d_moy#589], functions=[sum(UnscaledValue(cs_sales_price#567))], output=[i_category#537, i_brand#533, cc_name#615, d_year#587, d_moy#589, sum_sales#10])
                           +- Exchange hashpartitioning(i_category#537, i_brand#533, cc_name#615, d_year#587, d_moy#589, 200), ENSURE_REQUIREMENTS, [id=#302]
                              +- HashAggregate(keys=[i_category#537, i_brand#533, cc_name#615, d_year#587, d_moy#589], functions=[partial_sum(UnscaledValue(cs_sales_price#567))], output=[i_category#537, i_brand#533, cc_name#615, d_year#587, d_moy#589, sum#650L])
                                 +- Project [i_brand#533, i_category#537, cs_sales_price#567, d_year#587, d_moy#589, cc_name#615]
                                    +- BroadcastHashJoin [cs_call_center_sk#557L], [cc_call_center_sk#609L], Inner, BuildRight, false
                                       :- Project [i_brand#533, i_category#537, cs_call_center_sk#557L, cs_sales_price#567, d_year#587, d_moy#589]
                                       :  +- BroadcastHashJoin [cs_sold_date_sk#580L], [d_date_sk#581L], Inner, BuildRight, false
                                       :     :- Project [i_brand#533, i_category#537, cs_call_center_sk#557L, cs_sales_price#567, cs_sold_date_sk#580L]
                                       :     :  +- BroadcastHashJoin [i_item_sk#525L], [cs_item_sk#561L], Inner, BuildLeft, false
                                       :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#289]
                                       :     :     :  +- Filter ((isnotnull(i_item_sk#525L) AND isnotnull(i_category#537)) AND isnotnull(i_brand#533))
                                       :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#525L, i_brand#533, i_category#537], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#525L, i_item_id#526, i_rec_start_date#527, i_rec_end_date#528, i_item_desc#529, i_curr..., Partition Cols: []]
                                       :     :     +- Filter (isnotnull(cs_item_sk#561L) AND isnotnull(cs_call_center_sk#557L))
                                       :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#557L, cs_item_sk#561L, cs_sales_price#567, cs_sold_date_sk#580L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#547L, cs_ship_date_sk#548L, cs_bill_customer_sk#549L, cs_bill_cdemo_sk#550L, cs_..., Partition Cols: [cs_sold_date_sk#580L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#580L)]
                                       :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#293]
                                       :        +- Filter ((((d_year#587 = 1999) OR ((d_year#587 = 1998) AND (d_moy#589 = 12))) OR ((d_year#587 = 2000) AND (d_moy#589 = 1))) AND isnotnull(d_date_sk#581L))
                                       :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#581L, d_year#587, d_moy#589], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#581L, d_date_id#582, d_date#583, d_month_seq#584, d_week_seq#585, d_quarter_seq#586, d..., Partition Cols: []]
                                       +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#297]
                                          +- Filter (isnotnull(cc_call_center_sk#609L) AND isnotnull(cc_name#615))
                                             +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#609L, cc_name#615], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#609L, cc_call_center_id#610, cc_rec_start_date#611, cc_rec_end_date#612, cc_cl..., Partition Cols: []]
zhztheplayer commented 1 month ago

@wenfang6 Thanks. and it would be great if you can add the final physical plan as AQE is enabled.

How to:


val df = spark.sql("...")
df.explain() // This shows the initial physical plan
df.collect() 
df.explain() // This shows the final physical plan
wenfang6 commented 1 month ago

initial physical plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=100, orderBy=[CheckOverflow((promote_precision(cast(sum_sales#0 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(22,6), true) ASC NULLS FIRST,cc_name#97 ASC NULLS FIRST], output=[i_category#19,i_brand#15,cc_name#97,d_year#69,d_moy#71,avg_monthly_sales#1,sum_sales#0,psum#3,nsum#4])
   +- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, avg_monthly_sales#1, sum_sales#0, sum_sales#139 AS psum#3, sum_sales#147 AS nsum#4]
      +- ShuffledHashJoin [i_category#19, i_brand#15, cc_name#97, rn#2], [i_category#531, i_brand#527, cc_name#609, (rn#638 - 1)], Inner, BuildRight
         :- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum_sales#0, avg_monthly_sales#1, rn#2, sum_sales#139]
         :  +- ShuffledHashJoin [i_category#19, i_brand#15, cc_name#97, rn#2], [i_category#411, i_brand#407, cc_name#489, (rn#518 + 1)], Inner, BuildRight
         :     :- Exchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, rn#2, 200), ENSURE_REQUIREMENTS, [id=#265]
         :     :  +- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum_sales#0, avg_monthly_sales#1, rn#2]
         :     :     +- Filter ((isnotnull(avg_monthly_sales#1) AND (avg_monthly_sales#1 > 0.000000)) AND (CheckOverflow((promote_precision(abs(CheckOverflow((promote_precision(cast(sum_sales#0 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(22,6), true), false)) / promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(38,16), true) > 0.1000000000000000))
         :     :        +- Window [avg(_w0#133) windowspecdefinition(i_category#19, i_brand#15, cc_name#97, d_year#69, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg_monthly_sales#1], [i_category#19, i_brand#15, cc_name#97, d_year#69]
         :     :           +- Filter (isnotnull(d_year#69) AND (d_year#69 = 1999))
         :     :              +- Window [rank(d_year#69, d_moy#71) windowspecdefinition(i_category#19, i_brand#15, cc_name#97, d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#2], [i_category#19, i_brand#15, cc_name#97], [d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST]
         :     :                 +- Sort [i_category#19 ASC NULLS FIRST, i_brand#15 ASC NULLS FIRST, cc_name#97 ASC NULLS FIRST, d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST], false, 0
         :     :                    +- Exchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, 200), ENSURE_REQUIREMENTS, [id=#236]
         :     :                       +- HashAggregate(keys=[i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71], functions=[sum(UnscaledValue(cs_sales_price#49))])
         :     :                          +- Exchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, 200), ENSURE_REQUIREMENTS, [id=#233]
         :     :                             +- HashAggregate(keys=[i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71], functions=[partial_sum(UnscaledValue(cs_sales_price#49))])
         :     :                                +- Project [i_brand#15, i_category#19, cs_sales_price#49, d_year#69, d_moy#71, cc_name#97]
         :     :                                   +- BroadcastHashJoin [cs_call_center_sk#39L], [cc_call_center_sk#91L], Inner, BuildRight, false
         :     :                                      :- Project [i_brand#15, i_category#19, cs_call_center_sk#39L, cs_sales_price#49, d_year#69, d_moy#71]
         :     :                                      :  +- BroadcastHashJoin [cs_sold_date_sk#62L], [d_date_sk#63L], Inner, BuildRight, false
         :     :                                      :     :- Project [i_brand#15, i_category#19, cs_call_center_sk#39L, cs_sales_price#49, cs_sold_date_sk#62L]
         :     :                                      :     :  +- BroadcastHashJoin [i_item_sk#7L], [cs_item_sk#43L], Inner, BuildLeft, false
         :     :                                      :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#220]
         :     :                                      :     :     :  +- Filter ((isnotnull(i_item_sk#7L) AND isnotnull(i_category#19)) AND isnotnull(i_brand#15))
         :     :                                      :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#7L, i_brand#15, i_category#19], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#7L, i_item_id#8, i_rec_start_date#9, i_rec_end_date#10, i_item_desc#11, i_current_pric..., Partition Cols: []]
         :     :                                      :     :     +- Filter (isnotnull(cs_item_sk#43L) AND isnotnull(cs_call_center_sk#39L))
         :     :                                      :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#39L, cs_item_sk#43L, cs_sales_price#49, cs_sold_date_sk#62L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#29L, cs_ship_date_sk#30L, cs_bill_customer_sk#31L, cs_bill_cdemo_sk#32L, cs_bill..., Partition Cols: [cs_sold_date_sk#62L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#62L)]
         :     :                                      :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#224]
         :     :                                      :        +- Filter ((((d_year#69 = 1999) OR ((d_year#69 = 1998) AND (d_moy#71 = 12))) OR ((d_year#69 = 2000) AND (d_moy#71 = 1))) AND isnotnull(d_date_sk#63L))
         :     :                                      :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#63L, d_year#69, d_moy#71], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#63L, d_date_id#64, d_date#65, d_month_seq#66, d_week_seq#67, d_quarter_seq#68, d_year#..., Partition Cols: []]
         :     :                                      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#228]
         :     :                                         +- Filter (isnotnull(cc_call_center_sk#91L) AND isnotnull(cc_name#97))
         :     :                                            +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#91L, cc_name#97], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#91L, cc_call_center_id#92, cc_rec_start_date#93, cc_rec_end_date#94, cc_closed..., Partition Cols: []]
         :     +- Exchange hashpartitioning(i_category#411, i_brand#407, cc_name#489, (rn#518 + 1), 200), ENSURE_REQUIREMENTS, [id=#266]
         :        +- Project [i_category#411, i_brand#407, cc_name#489, sum_sales#0 AS sum_sales#139, rn#518]
         :           +- Window [rank(d_year#461, d_moy#463) windowspecdefinition(i_category#411, i_brand#407, cc_name#489, d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#518], [i_category#411, i_brand#407, cc_name#489], [d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST]
         :              +- Sort [i_category#411 ASC NULLS FIRST, i_brand#407 ASC NULLS FIRST, cc_name#489 ASC NULLS FIRST, d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST], false, 0
         :                 +- Exchange hashpartitioning(i_category#411, i_brand#407, cc_name#489, 200), ENSURE_REQUIREMENTS, [id=#259]
         :                    +- HashAggregate(keys=[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463], functions=[sum(UnscaledValue(cs_sales_price#441))])
         :                       +- Exchange hashpartitioning(i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, 200), ENSURE_REQUIREMENTS, [id=#256]
         :                          +- HashAggregate(keys=[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463], functions=[partial_sum(UnscaledValue(cs_sales_price#441))])
         :                             +- Project [i_brand#407, i_category#411, cs_sales_price#441, d_year#461, d_moy#463, cc_name#489]
         :                                +- BroadcastHashJoin [cs_call_center_sk#431L], [cc_call_center_sk#483L], Inner, BuildRight, false
         :                                   :- Project [i_brand#407, i_category#411, cs_call_center_sk#431L, cs_sales_price#441, d_year#461, d_moy#463]
         :                                   :  +- BroadcastHashJoin [cs_sold_date_sk#454L], [d_date_sk#455L], Inner, BuildRight, false
         :                                   :     :- Project [i_brand#407, i_category#411, cs_call_center_sk#431L, cs_sales_price#441, cs_sold_date_sk#454L]
         :                                   :     :  +- BroadcastHashJoin [i_item_sk#399L], [cs_item_sk#435L], Inner, BuildLeft, false
         :                                   :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#243]
         :                                   :     :     :  +- Filter ((isnotnull(i_item_sk#399L) AND isnotnull(i_category#411)) AND isnotnull(i_brand#407))
         :                                   :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#399L, i_brand#407, i_category#411], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#399L, i_item_id#400, i_rec_start_date#401, i_rec_end_date#402, i_item_desc#403, i_curr..., Partition Cols: []]
         :                                   :     :     +- Filter (isnotnull(cs_item_sk#435L) AND isnotnull(cs_call_center_sk#431L))
         :                                   :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#431L, cs_item_sk#435L, cs_sales_price#441, cs_sold_date_sk#454L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#421L, cs_ship_date_sk#422L, cs_bill_customer_sk#423L, cs_bill_cdemo_sk#424L, cs_..., Partition Cols: [cs_sold_date_sk#454L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#454L)]
         :                                   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#247]
         :                                   :        +- Filter ((((d_year#461 = 1999) OR ((d_year#461 = 1998) AND (d_moy#463 = 12))) OR ((d_year#461 = 2000) AND (d_moy#463 = 1))) AND isnotnull(d_date_sk#455L))
         :                                   :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#455L, d_year#461, d_moy#463], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#455L, d_date_id#456, d_date#457, d_month_seq#458, d_week_seq#459, d_quarter_seq#460, d..., Partition Cols: []]
         :                                   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#251]
         :                                      +- Filter (isnotnull(cc_call_center_sk#483L) AND isnotnull(cc_name#489))
         :                                         +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#483L, cc_name#489], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#483L, cc_call_center_id#484, cc_rec_start_date#485, cc_rec_end_date#486, cc_cl..., Partition Cols: []]
         +- Exchange hashpartitioning(i_category#531, i_brand#527, cc_name#609, (rn#638 - 1), 200), ENSURE_REQUIREMENTS, [id=#291]
            +- Project [i_category#531, i_brand#527, cc_name#609, sum_sales#0 AS sum_sales#147, rn#638]
               +- Window [rank(d_year#581, d_moy#583) windowspecdefinition(i_category#531, i_brand#527, cc_name#609, d_year#581 ASC NULLS FIRST, d_moy#583 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#638], [i_category#531, i_brand#527, cc_name#609], [d_year#581 ASC NULLS FIRST, d_moy#583 ASC NULLS FIRST]
                  +- Sort [i_category#531 ASC NULLS FIRST, i_brand#527 ASC NULLS FIRST, cc_name#609 ASC NULLS FIRST, d_year#581 ASC NULLS FIRST, d_moy#583 ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(i_category#531, i_brand#527, cc_name#609, 200), ENSURE_REQUIREMENTS, [id=#285]
                        +- HashAggregate(keys=[i_category#531, i_brand#527, cc_name#609, d_year#581, d_moy#583], functions=[sum(UnscaledValue(cs_sales_price#561))])
                           +- Exchange hashpartitioning(i_category#531, i_brand#527, cc_name#609, d_year#581, d_moy#583, 200), ENSURE_REQUIREMENTS, [id=#282]
                              +- HashAggregate(keys=[i_category#531, i_brand#527, cc_name#609, d_year#581, d_moy#583], functions=[partial_sum(UnscaledValue(cs_sales_price#561))])
                                 +- Project [i_brand#527, i_category#531, cs_sales_price#561, d_year#581, d_moy#583, cc_name#609]
                                    +- BroadcastHashJoin [cs_call_center_sk#551L], [cc_call_center_sk#603L], Inner, BuildRight, false
                                       :- Project [i_brand#527, i_category#531, cs_call_center_sk#551L, cs_sales_price#561, d_year#581, d_moy#583]
                                       :  +- BroadcastHashJoin [cs_sold_date_sk#574L], [d_date_sk#575L], Inner, BuildRight, false
                                       :     :- Project [i_brand#527, i_category#531, cs_call_center_sk#551L, cs_sales_price#561, cs_sold_date_sk#574L]
                                       :     :  +- BroadcastHashJoin [i_item_sk#519L], [cs_item_sk#555L], Inner, BuildLeft, false
                                       :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#269]
                                       :     :     :  +- Filter ((isnotnull(i_item_sk#519L) AND isnotnull(i_category#531)) AND isnotnull(i_brand#527))
                                       :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#519L, i_brand#527, i_category#531], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#519L, i_item_id#520, i_rec_start_date#521, i_rec_end_date#522, i_item_desc#523, i_curr..., Partition Cols: []]
                                       :     :     +- Filter (isnotnull(cs_item_sk#555L) AND isnotnull(cs_call_center_sk#551L))
                                       :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#551L, cs_item_sk#555L, cs_sales_price#561, cs_sold_date_sk#574L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#541L, cs_ship_date_sk#542L, cs_bill_customer_sk#543L, cs_bill_cdemo_sk#544L, cs_..., Partition Cols: [cs_sold_date_sk#574L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#574L)]
                                       :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#273]
                                       :        +- Filter ((((d_year#581 = 1999) OR ((d_year#581 = 1998) AND (d_moy#583 = 12))) OR ((d_year#581 = 2000) AND (d_moy#583 = 1))) AND isnotnull(d_date_sk#575L))
                                       :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#575L, d_year#581, d_moy#583], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#575L, d_date_id#576, d_date#577, d_month_seq#578, d_week_seq#579, d_quarter_seq#580, d..., Partition Cols: []]
                                       +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#277]
                                          +- Filter (isnotnull(cc_call_center_sk#603L) AND isnotnull(cc_name#609))
                                             +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#603L, cc_name#609], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#603L, cc_call_center_id#604, cc_rec_start_date#605, cc_rec_end_date#606, cc_cl..., Partition Cols: []]

df.collect() failed. final physical plan

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Current Plan ==
   TakeOrderedAndProject(limit=100, orderBy=[CheckOverflow((promote_precision(cast(sum_sales#0 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(22,6), true) ASC NULLS FIRST,cc_name#97 ASC NULLS FIRST], output=[i_category#19,i_brand#15,cc_name#97,d_year#69,d_moy#71,avg_monthly_sales#1,sum_sales#0,psum#3,nsum#4])
   +- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, avg_monthly_sales#1, sum_sales#0, sum_sales#139 AS psum#3, sum_sales#147 AS nsum#4]
      +- BroadcastHashJoin [i_category#19, i_brand#15, cc_name#97, rn#2], [i_category#531, i_brand#527, cc_name#609, (rn#638 - 1)], Inner, BuildRight, false
         :- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum_sales#0, avg_monthly_sales#1, rn#2, sum_sales#139]
         :  +- BroadcastHashJoin [i_category#19, i_brand#15, cc_name#97, rn#2], [i_category#411, i_brand#407, cc_name#489, (rn#518 + 1)], Inner, BuildRight, false
         :     :- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum_sales#0, avg_monthly_sales#1, rn#2]
         :     :  +- Filter ((isnotnull(avg_monthly_sales#1) AND (avg_monthly_sales#1 > 0.000000)) AND (CheckOverflow((promote_precision(abs(CheckOverflow((promote_precision(cast(sum_sales#0 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(22,6), true), false)) / promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(38,16), true) > 0.1000000000000000))
         :     :     +- Window [avg(_w0#133) windowspecdefinition(i_category#19, i_brand#15, cc_name#97, d_year#69, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg_monthly_sales#1], [i_category#19, i_brand#15, cc_name#97, d_year#69]
         :     :        +- Filter (isnotnull(d_year#69) AND (d_year#69 = 1999))
         :     :           +- Window [rank(d_year#69, d_moy#71) windowspecdefinition(i_category#19, i_brand#15, cc_name#97, d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#2], [i_category#19, i_brand#15, cc_name#97], [d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST]
         :     :              +- Sort [i_category#19 ASC NULLS FIRST, i_brand#15 ASC NULLS FIRST, cc_name#97 ASC NULLS FIRST, d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST], false, 0
         :     :                 +- ShuffleQueryStage 20
         :     :                    +- ColumnarExchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, 200), ENSURE_REQUIREMENTS, [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum_sales#0, _w0#133], [id=#2549], [id=#2549], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum_sales:DecimalType(17,2), _w0:DecimalType(17,2))
         :     :                       +- VeloxAppendBatches 3276
         :     :                          +- ^(13) ProjectExecTransformer [hash(i_category#19, i_brand#15, cc_name#97, 42) AS hash_partition_key#867, i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, MakeDecimal(sum(UnscaledValue(cs_sales_price#49))#122L,17,2) AS sum_sales#0, MakeDecimal(sum(UnscaledValue(cs_sales_price#49))#122L,17,2) AS _w0#133]
         :     :                             +- ^(13) HashAggregateTransformer(keys=[i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71], functions=[sum(UnscaledValue(cs_sales_price#49))])
         :     :                                +- ^(13) InputIteratorTransformer[i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum#640L]
         :     :                                   +- ShuffleQueryStage 15
         :     :                                      +- ColumnarExchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, 200), ENSURE_REQUIREMENTS, [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum#640L], [id=#1838], [id=#1838], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum:LongType)
         :     :                                         +- VeloxAppendBatches 3276
         :     :                                            +- ^(10) ProjectExecTransformer [hash(i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, 42) AS hash_partition_key#801, i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum#640L]
         :     :                                               +- ^(10) FlushableHashAggregateTransformer(keys=[i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71], functions=[partial_sum(_pre_0#798L)])
         :     :                                                  +- ^(10) ProjectExecTransformer [i_brand#15, i_category#19, cs_sales_price#49, d_year#69, d_moy#71, cc_name#97, UnscaledValue(cs_sales_price#49) AS _pre_0#798L]
         :     :                                                     +- ^(10) BroadcastHashJoinExecTransformer [cs_call_center_sk#39L], [cc_call_center_sk#91L], Inner, BuildRight, false
         :     :                                                        :- ^(10) ProjectExecTransformer [i_brand#15, i_category#19, cs_call_center_sk#39L, cs_sales_price#49, d_year#69, d_moy#71]
         :     :                                                        :  +- ^(10) BroadcastHashJoinExecTransformer [cs_sold_date_sk#62L], [d_date_sk#63L], Inner, BuildRight, false
         :     :                                                        :     :- ^(10) ProjectExecTransformer [i_brand#15, i_category#19, cs_call_center_sk#39L, cs_sales_price#49, cs_sold_date_sk#62L]
         :     :                                                        :     :  +- ^(10) BroadcastHashJoinExecTransformer [i_item_sk#7L], [cs_item_sk#43L], Inner, BuildLeft, false
         :     :                                                        :     :     :- ^(10) InputIteratorTransformer[i_item_sk#7L, i_brand#15, i_category#19]
         :     :                                                        :     :     :  +- BroadcastQueryStage 0
         :     :                                                        :     :     :     +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#357]
         :     :                                                        :     :     :        +- ^(1) FilterExecTransformer ((isnotnull(i_item_sk#7L) AND isnotnull(i_category#19)) AND isnotnull(i_brand#15))
         :     :                                                        :     :     :           +- ^(1) NativeScan hive dwd_insys_test.item [i_item_sk#7L, i_brand#15, i_category#19], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#7L, i_item_id#8, i_rec_start_date#9, i_rec_end_date#10, i_item_desc#11, i_current_pric..., Partition Cols: []]
         :     :                                                        :     :     +- ^(10) FilterExecTransformer (isnotnull(cs_item_sk#43L) AND isnotnull(cs_call_center_sk#39L))
         :     :                                                        :     :        +- ^(10) NativeScan hive dwd_insys_test.catalog_sales [cs_call_center_sk#39L, cs_item_sk#43L, cs_sales_price#49, cs_sold_date_sk#62L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#29L, cs_ship_date_sk#30L, cs_bill_customer_sk#31L, cs_bill_cdemo_sk#32L, cs_bill..., Partition Cols: [cs_sold_date_sk#62L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#62L)]
         :     :                                                        :     +- ^(10) InputIteratorTransformer[d_date_sk#63L, d_year#69, d_moy#71]
         :     :                                                        :        +- BroadcastQueryStage 1
         :     :                                                        :           +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#378]
         :     :                                                        :              +- ^(2) FilterExecTransformer ((((d_year#69 = 1999) OR ((d_year#69 = 1998) AND (d_moy#71 = 12))) OR ((d_year#69 = 2000) AND (d_moy#71 = 1))) AND isnotnull(d_date_sk#63L))
         :     :                                                        :                 +- ^(2) NativeScan hive dwd_insys_test.date_dim [d_date_sk#63L, d_year#69, d_moy#71], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#63L, d_date_id#64, d_date#65, d_month_seq#66, d_week_seq#67, d_quarter_seq#68, d_year#..., Partition Cols: []]
         :     :                                                        +- ^(10) InputIteratorTransformer[cc_call_center_sk#91L, cc_name#97]
         :     :                                                           +- BroadcastQueryStage 2
         :     :                                                              +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#399]
         :     :                                                                 +- ^(3) FilterExecTransformer (isnotnull(cc_call_center_sk#91L) AND isnotnull(cc_name#97))
         :     :                                                                    +- ^(3) NativeScan hive dwd_insys_test.call_center [cc_call_center_sk#91L, cc_name#97], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#91L, cc_call_center_id#92, cc_rec_start_date#93, cc_rec_end_date#94, cc_closed..., Partition Cols: []]
         :     +- BroadcastQueryStage 24
         :        +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] + 1)),false), [id=#2832]
         :           +- ^(16) ProjectExecTransformer [i_category#411, i_brand#407, cc_name#489, sum_sales#0 AS sum_sales#139, rn#518]
         :              +- ^(16) WindowExecTransformer [rank(d_year#461, d_moy#463) windowspecdefinition(i_category#411, i_brand#407, cc_name#489, d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#518], [i_category#411, i_brand#407, cc_name#489], [d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST]
         :                 +- ^(16) SortExecTransformer [i_category#411 ASC NULLS FIRST, i_brand#407 ASC NULLS FIRST, cc_name#489 ASC NULLS FIRST, d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST], false, 0
         :                    +- ^(16) InputIteratorTransformer[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, sum_sales#0]
         :                       +- AQEShuffleRead coalesced
         :                          +- ShuffleQueryStage 21
         :                             +- ColumnarExchange hashpartitioning(i_category#411, i_brand#407, cc_name#489, 200), ENSURE_REQUIREMENTS, [i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, sum_sales#0], [id=#2620], [id=#2620], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum_sales:DecimalType(17,2))
         :                                +- VeloxAppendBatches 3276
         :                                   +- ^(14) ProjectExecTransformer [hash(i_category#411, i_brand#407, cc_name#489, 42) AS hash_partition_key#876, i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, MakeDecimal(sum(UnscaledValue(cs_sales_price#441))#122L,17,2) AS sum_sales#0]
         :                                      +- ^(14) HashAggregateTransformer(keys=[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463], functions=[sum(UnscaledValue(cs_sales_price#441))])
         :                                         +- ^(14) InputIteratorTransformer[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, sum#642L]
         :                                            +- ShuffleQueryStage 17
         :                                               +- ReusedExchange [i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, sum#642L], ColumnarExchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, 200), ENSURE_REQUIREMENTS, [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum#640L], [id=#1838], [id=#1838], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum:LongType)
         +- BroadcastQueryStage 27
            +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] - 1)),false), [id=#3024]
               +- BroadcastQueryStage 26
                  +- ReusedExchange [i_category#531, i_brand#527, cc_name#609, sum_sales#147, rn#638], ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] + 1)),false), [id=#2832]
+- == Initial Plan ==
   TakeOrderedAndProject(limit=100, orderBy=[CheckOverflow((promote_precision(cast(sum_sales#0 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(22,6), true) ASC NULLS FIRST,cc_name#97 ASC NULLS FIRST], output=[i_category#19,i_brand#15,cc_name#97,d_year#69,d_moy#71,avg_monthly_sales#1,sum_sales#0,psum#3,nsum#4])
   +- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, avg_monthly_sales#1, sum_sales#0, sum_sales#139 AS psum#3, sum_sales#147 AS nsum#4]
      +- ShuffledHashJoin [i_category#19, i_brand#15, cc_name#97, rn#2], [i_category#531, i_brand#527, cc_name#609, (rn#638 - 1)], Inner, BuildRight
         :- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum_sales#0, avg_monthly_sales#1, rn#2, sum_sales#139]
         :  +- ShuffledHashJoin [i_category#19, i_brand#15, cc_name#97, rn#2], [i_category#411, i_brand#407, cc_name#489, (rn#518 + 1)], Inner, BuildRight
         :     :- Exchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, rn#2, 200), ENSURE_REQUIREMENTS, [id=#265]
         :     :  +- Project [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum_sales#0, avg_monthly_sales#1, rn#2]
         :     :     +- Filter ((isnotnull(avg_monthly_sales#1) AND (avg_monthly_sales#1 > 0.000000)) AND (CheckOverflow((promote_precision(abs(CheckOverflow((promote_precision(cast(sum_sales#0 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(22,6), true), false)) / promote_precision(cast(avg_monthly_sales#1 as decimal(22,6)))), DecimalType(38,16), true) > 0.1000000000000000))
         :     :        +- Window [avg(_w0#133) windowspecdefinition(i_category#19, i_brand#15, cc_name#97, d_year#69, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg_monthly_sales#1], [i_category#19, i_brand#15, cc_name#97, d_year#69]
         :     :           +- Filter (isnotnull(d_year#69) AND (d_year#69 = 1999))
         :     :              +- Window [rank(d_year#69, d_moy#71) windowspecdefinition(i_category#19, i_brand#15, cc_name#97, d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#2], [i_category#19, i_brand#15, cc_name#97], [d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST]
         :     :                 +- Sort [i_category#19 ASC NULLS FIRST, i_brand#15 ASC NULLS FIRST, cc_name#97 ASC NULLS FIRST, d_year#69 ASC NULLS FIRST, d_moy#71 ASC NULLS FIRST], false, 0
         :     :                    +- Exchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, 200), ENSURE_REQUIREMENTS, [id=#236]
         :     :                       +- HashAggregate(keys=[i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71], functions=[sum(UnscaledValue(cs_sales_price#49))])
         :     :                          +- Exchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, 200), ENSURE_REQUIREMENTS, [id=#233]
         :     :                             +- HashAggregate(keys=[i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71], functions=[partial_sum(UnscaledValue(cs_sales_price#49))])
         :     :                                +- Project [i_brand#15, i_category#19, cs_sales_price#49, d_year#69, d_moy#71, cc_name#97]
         :     :                                   +- BroadcastHashJoin [cs_call_center_sk#39L], [cc_call_center_sk#91L], Inner, BuildRight, false
         :     :                                      :- Project [i_brand#15, i_category#19, cs_call_center_sk#39L, cs_sales_price#49, d_year#69, d_moy#71]
         :     :                                      :  +- BroadcastHashJoin [cs_sold_date_sk#62L], [d_date_sk#63L], Inner, BuildRight, false
         :     :                                      :     :- Project [i_brand#15, i_category#19, cs_call_center_sk#39L, cs_sales_price#49, cs_sold_date_sk#62L]
         :     :                                      :     :  +- BroadcastHashJoin [i_item_sk#7L], [cs_item_sk#43L], Inner, BuildLeft, false
         :     :                                      :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#220]
         :     :                                      :     :     :  +- Filter ((isnotnull(i_item_sk#7L) AND isnotnull(i_category#19)) AND isnotnull(i_brand#15))
         :     :                                      :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#7L, i_brand#15, i_category#19], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#7L, i_item_id#8, i_rec_start_date#9, i_rec_end_date#10, i_item_desc#11, i_current_pric..., Partition Cols: []]
         :     :                                      :     :     +- Filter (isnotnull(cs_item_sk#43L) AND isnotnull(cs_call_center_sk#39L))
         :     :                                      :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#39L, cs_item_sk#43L, cs_sales_price#49, cs_sold_date_sk#62L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#29L, cs_ship_date_sk#30L, cs_bill_customer_sk#31L, cs_bill_cdemo_sk#32L, cs_bill..., Partition Cols: [cs_sold_date_sk#62L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#62L)]
         :     :                                      :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#224]
         :     :                                      :        +- Filter ((((d_year#69 = 1999) OR ((d_year#69 = 1998) AND (d_moy#71 = 12))) OR ((d_year#69 = 2000) AND (d_moy#71 = 1))) AND isnotnull(d_date_sk#63L))
         :     :                                      :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#63L, d_year#69, d_moy#71], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#63L, d_date_id#64, d_date#65, d_month_seq#66, d_week_seq#67, d_quarter_seq#68, d_year#..., Partition Cols: []]
         :     :                                      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#228]
         :     :                                         +- Filter (isnotnull(cc_call_center_sk#91L) AND isnotnull(cc_name#97))
         :     :                                            +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#91L, cc_name#97], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#91L, cc_call_center_id#92, cc_rec_start_date#93, cc_rec_end_date#94, cc_closed..., Partition Cols: []]
         :     +- Exchange hashpartitioning(i_category#411, i_brand#407, cc_name#489, (rn#518 + 1), 200), ENSURE_REQUIREMENTS, [id=#266]
         :        +- Project [i_category#411, i_brand#407, cc_name#489, sum_sales#0 AS sum_sales#139, rn#518]
         :           +- Window [rank(d_year#461, d_moy#463) windowspecdefinition(i_category#411, i_brand#407, cc_name#489, d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#518], [i_category#411, i_brand#407, cc_name#489], [d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST]
         :              +- Sort [i_category#411 ASC NULLS FIRST, i_brand#407 ASC NULLS FIRST, cc_name#489 ASC NULLS FIRST, d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST], false, 0
         :                 +- Exchange hashpartitioning(i_category#411, i_brand#407, cc_name#489, 200), ENSURE_REQUIREMENTS, [id=#259]
         :                    +- HashAggregate(keys=[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463], functions=[sum(UnscaledValue(cs_sales_price#441))])
         :                       +- Exchange hashpartitioning(i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, 200), ENSURE_REQUIREMENTS, [id=#256]
         :                          +- HashAggregate(keys=[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463], functions=[partial_sum(UnscaledValue(cs_sales_price#441))])
         :                             +- Project [i_brand#407, i_category#411, cs_sales_price#441, d_year#461, d_moy#463, cc_name#489]
         :                                +- BroadcastHashJoin [cs_call_center_sk#431L], [cc_call_center_sk#483L], Inner, BuildRight, false
         :                                   :- Project [i_brand#407, i_category#411, cs_call_center_sk#431L, cs_sales_price#441, d_year#461, d_moy#463]
         :                                   :  +- BroadcastHashJoin [cs_sold_date_sk#454L], [d_date_sk#455L], Inner, BuildRight, false
         :                                   :     :- Project [i_brand#407, i_category#411, cs_call_center_sk#431L, cs_sales_price#441, cs_sold_date_sk#454L]
         :                                   :     :  +- BroadcastHashJoin [i_item_sk#399L], [cs_item_sk#435L], Inner, BuildLeft, false
         :                                   :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#243]
         :                                   :     :     :  +- Filter ((isnotnull(i_item_sk#399L) AND isnotnull(i_category#411)) AND isnotnull(i_brand#407))
         :                                   :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#399L, i_brand#407, i_category#411], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#399L, i_item_id#400, i_rec_start_date#401, i_rec_end_date#402, i_item_desc#403, i_curr..., Partition Cols: []]
         :                                   :     :     +- Filter (isnotnull(cs_item_sk#435L) AND isnotnull(cs_call_center_sk#431L))
         :                                   :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#431L, cs_item_sk#435L, cs_sales_price#441, cs_sold_date_sk#454L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#421L, cs_ship_date_sk#422L, cs_bill_customer_sk#423L, cs_bill_cdemo_sk#424L, cs_..., Partition Cols: [cs_sold_date_sk#454L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#454L)]
         :                                   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#247]
         :                                   :        +- Filter ((((d_year#461 = 1999) OR ((d_year#461 = 1998) AND (d_moy#463 = 12))) OR ((d_year#461 = 2000) AND (d_moy#463 = 1))) AND isnotnull(d_date_sk#455L))
         :                                   :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#455L, d_year#461, d_moy#463], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#455L, d_date_id#456, d_date#457, d_month_seq#458, d_week_seq#459, d_quarter_seq#460, d..., Partition Cols: []]
         :                                   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#251]
         :                                      +- Filter (isnotnull(cc_call_center_sk#483L) AND isnotnull(cc_name#489))
         :                                         +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#483L, cc_name#489], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#483L, cc_call_center_id#484, cc_rec_start_date#485, cc_rec_end_date#486, cc_cl..., Partition Cols: []]
         +- Exchange hashpartitioning(i_category#531, i_brand#527, cc_name#609, (rn#638 - 1), 200), ENSURE_REQUIREMENTS, [id=#291]
            +- Project [i_category#531, i_brand#527, cc_name#609, sum_sales#0 AS sum_sales#147, rn#638]
               +- Window [rank(d_year#581, d_moy#583) windowspecdefinition(i_category#531, i_brand#527, cc_name#609, d_year#581 ASC NULLS FIRST, d_moy#583 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#638], [i_category#531, i_brand#527, cc_name#609], [d_year#581 ASC NULLS FIRST, d_moy#583 ASC NULLS FIRST]
                  +- Sort [i_category#531 ASC NULLS FIRST, i_brand#527 ASC NULLS FIRST, cc_name#609 ASC NULLS FIRST, d_year#581 ASC NULLS FIRST, d_moy#583 ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(i_category#531, i_brand#527, cc_name#609, 200), ENSURE_REQUIREMENTS, [id=#285]
                        +- HashAggregate(keys=[i_category#531, i_brand#527, cc_name#609, d_year#581, d_moy#583], functions=[sum(UnscaledValue(cs_sales_price#561))])
                           +- Exchange hashpartitioning(i_category#531, i_brand#527, cc_name#609, d_year#581, d_moy#583, 200), ENSURE_REQUIREMENTS, [id=#282]
                              +- HashAggregate(keys=[i_category#531, i_brand#527, cc_name#609, d_year#581, d_moy#583], functions=[partial_sum(UnscaledValue(cs_sales_price#561))])
                                 +- Project [i_brand#527, i_category#531, cs_sales_price#561, d_year#581, d_moy#583, cc_name#609]
                                    +- BroadcastHashJoin [cs_call_center_sk#551L], [cc_call_center_sk#603L], Inner, BuildRight, false
                                       :- Project [i_brand#527, i_category#531, cs_call_center_sk#551L, cs_sales_price#561, d_year#581, d_moy#583]
                                       :  +- BroadcastHashJoin [cs_sold_date_sk#574L], [d_date_sk#575L], Inner, BuildRight, false
                                       :     :- Project [i_brand#527, i_category#531, cs_call_center_sk#551L, cs_sales_price#561, cs_sold_date_sk#574L]
                                       :     :  +- BroadcastHashJoin [i_item_sk#519L], [cs_item_sk#555L], Inner, BuildLeft, false
                                       :     :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#269]
                                       :     :     :  +- Filter ((isnotnull(i_item_sk#519L) AND isnotnull(i_category#531)) AND isnotnull(i_brand#527))
                                       :     :     :     +- Scan hive dwd_insys_test.item [i_item_sk#519L, i_brand#527, i_category#531], HiveTableRelation [`dwd_insys_test`.`item`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [i_item_sk#519L, i_item_id#520, i_rec_start_date#521, i_rec_end_date#522, i_item_desc#523, i_curr..., Partition Cols: []]
                                       :     :     +- Filter (isnotnull(cs_item_sk#555L) AND isnotnull(cs_call_center_sk#551L))
                                       :     :        +- Scan hive dwd_insys_test.catalog_sales [cs_call_center_sk#551L, cs_item_sk#555L, cs_sales_price#561, cs_sold_date_sk#574L], HiveTableRelation [`dwd_insys_test`.`catalog_sales`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cs_sold_time_sk#541L, cs_ship_date_sk#542L, cs_bill_customer_sk#543L, cs_bill_cdemo_sk#544L, cs_..., Partition Cols: [cs_sold_date_sk#574L], Pruned Partitions: [(cs_sold_date_sk=2450815), (cs_sold_date_sk=2450816), (cs_sold_date_sk=2450817), (cs_sold_date_s...], [isnotnull(cs_sold_date_sk#574L)]
                                       :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#273]
                                       :        +- Filter ((((d_year#581 = 1999) OR ((d_year#581 = 1998) AND (d_moy#583 = 12))) OR ((d_year#581 = 2000) AND (d_moy#583 = 1))) AND isnotnull(d_date_sk#575L))
                                       :           +- Scan hive dwd_insys_test.date_dim [d_date_sk#575L, d_year#581, d_moy#583], HiveTableRelation [`dwd_insys_test`.`date_dim`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [d_date_sk#575L, d_date_id#576, d_date#577, d_month_seq#578, d_week_seq#579, d_quarter_seq#580, d..., Partition Cols: []]
                                       +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#277]
                                          +- Filter (isnotnull(cc_call_center_sk#603L) AND isnotnull(cc_name#609))
                                             +- Scan hive dwd_insys_test.call_center [cc_call_center_sk#603L, cc_name#609], HiveTableRelation [`dwd_insys_test`.`call_center`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, Data Cols: [cc_call_center_sk#603L, cc_call_center_id#604, cc_rec_start_date#605, cc_rec_end_date#606, cc_cl..., Partition Cols: []]
wenfang6 commented 1 month ago

@zhztheplayer l think stage 24 failed.

+- BroadcastQueryStage 24
         :        +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] + 1)),false), [id=#2832]
         :           +- ^(16) ProjectExecTransformer [i_category#411, i_brand#407, cc_name#489, sum_sales#0 AS sum_sales#139, rn#518]
         :              +- ^(16) WindowExecTransformer [rank(d_year#461, d_moy#463) windowspecdefinition(i_category#411, i_brand#407, cc_name#489, d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#518], [i_category#411, i_brand#407, cc_name#489], [d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST]
         :                 +- ^(16) SortExecTransformer [i_category#411 ASC NULLS FIRST, i_brand#407 ASC NULLS FIRST, cc_name#489 ASC NULLS FIRST, d_year#461 ASC NULLS FIRST, d_moy#463 ASC NULLS FIRST], false, 0
         :                    +- ^(16) InputIteratorTransformer[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, sum_sales#0]
         :                       +- AQEShuffleRead coalesced
         :                          +- ShuffleQueryStage 21
         :                             +- ColumnarExchange hashpartitioning(i_category#411, i_brand#407, cc_name#489, 200), ENSURE_REQUIREMENTS, [i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, sum_sales#0], [id=#2620], [id=#2620], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum_sales:DecimalType(17,2))
         :                                +- VeloxAppendBatches 3276
         :                                   +- ^(14) ProjectExecTransformer [hash(i_category#411, i_brand#407, cc_name#489, 42) AS hash_partition_key#876, i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, MakeDecimal(sum(UnscaledValue(cs_sales_price#441))#122L,17,2) AS sum_sales#0]
         :                                      +- ^(14) HashAggregateTransformer(keys=[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463], functions=[sum(UnscaledValue(cs_sales_price#441))])
         :                                         +- ^(14) InputIteratorTransformer[i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, sum#642L]
         :                                            +- ShuffleQueryStage 17
         :                                               +- ReusedExchange [i_category#411, i_brand#407, cc_name#489, d_year#461, d_moy#463, sum#642L], ColumnarExchange hashpartitioning(i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, 200), ENSURE_REQUIREMENTS, [i_category#19, i_brand#15, cc_name#97, d_year#69, d_moy#71, sum#640L], [id=#1838], [id=#1838], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:IntegerType, d_moy:IntegerType, sum:LongType)
zhztheplayer commented 1 month ago

It looks like an regular exchange is expected in ReusedExchange however there is broadcast exchange.

Which Gluten version are you using?

wenfang6 commented 1 month ago

It looks like an regular exchange is expected in ReusedExchange however there is broadcast exchange.

Which Gluten version are you using?

@zhztheplayer I pulled it from the main branch last month. gluten 1.2.0-SNAPSHOT

wenfang6 commented 3 weeks ago

It looks like an regular exchange is expected in ReusedExchange however there is broadcast exchange.

Which Gluten version are you using?

@zhztheplayer Last Friday, I pulled the latest gluten for compilation and testing, but still had this issue.

zhztheplayer commented 3 weeks ago

@wenfang6 How do you use 3.2.1 with Gluten?

I ran the same query with 3.2.2 which is the default supported 3.2 version and no error occurred.

The following is the explained query plan:

+- == Final Plan ==
   VeloxColumnarToRowExec
   +- TakeOrderedAndProjectExecTransformer (limit=100, orderBy=[_pre_3#3924 ASC NULLS FIRST,cc_name#2900 ASC NULLS FIRST], output=[i_category#2102,i_brand#2098,cc_name#2900,d_year#2516L,d_moy#2518L,avg_monthly_sales#3204,sum_sales#3203,psum#3206,nsum#3207])
      +- ^(18) ProjectExecTransformer [i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, avg_monthly_sales#3204, sum_sales#3203, sum_sales#3227 AS psum#3206, sum_sales#3235 AS nsum#3207, CheckOverflow((promote_precision(cast(sum_sales#3203 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#3204 as decimal(22,6)))), DecimalType(22,6), true) AS _pre_3#3924]
         +- ^(18) BroadcastHashJoinExecTransformer [i_category#2102, i_brand#2098, cc_name#2900, rn#3205], [i_category#3619, i_brand#3615, cc_name#3697, (rn#3726 - 1)], Inner, BuildRight, false
            :- ^(18) ProjectExecTransformer [i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum_sales#3203, avg_monthly_sales#3204, rn#3205, sum_sales#3227]
            :  +- ^(18) BroadcastHashJoinExecTransformer [i_category#2102, i_brand#2098, cc_name#2900, rn#3205], [i_category#3499, i_brand#3495, cc_name#3577, (rn#3606 + 1)], Inner, BuildRight, false
            :     :- ^(18) ProjectExecTransformer [i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum_sales#3203, avg_monthly_sales#3204, rn#3205]
            :     :  +- ^(18) FilterExecTransformer ((isnotnull(avg_monthly_sales#3204) AND (avg_monthly_sales#3204 > 0.000000)) AND (CheckOverflow((promote_precision(abs(CheckOverflow((promote_precision(cast(sum_sales#3203 as decimal(22,6))) - promote_precision(cast(avg_monthly_sales#3204 as decimal(22,6)))), DecimalType(22,6), true), false)) / promote_precision(cast(avg_monthly_sales#3204 as decimal(22,6)))), DecimalType(38,16), true) > 0.1000000000000000))
            :     :     +- ^(18) WindowExecTransformer [avg(_w0#3221) windowspecdefinition(i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS avg_monthly_sales#3204], [i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L]
            :     :        +- ^(18) FilterExecTransformer (isnotnull(d_year#2516L) AND (d_year#2516L = 1999))
            :     :           +- ^(18) WindowExecTransformer [rank(d_year#2516L, d_moy#2518L) windowspecdefinition(i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L ASC NULLS FIRST, d_moy#2518L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#3205], [i_category#2102, i_brand#2098, cc_name#2900], [d_year#2516L ASC NULLS FIRST, d_moy#2518L ASC NULLS FIRST]
            :     :              +- ^(18) SortExecTransformer [i_category#2102 ASC NULLS FIRST, i_brand#2098 ASC NULLS FIRST, cc_name#2900 ASC NULLS FIRST, d_year#2516L ASC NULLS FIRST, d_moy#2518L ASC NULLS FIRST], false, 0
            :     :                 +- ^(18) InputIteratorTransformer[i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum_sales#3203, _w0#3221]
            :     :                    +- ShuffleQueryStage 20
            :     :                       +- ColumnarExchange hashpartitioning(i_category#2102, i_brand#2098, cc_name#2900, 100), ENSURE_REQUIREMENTS, [i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum_sales#3203, _w0#3221], [plan_id=4533], [shuffle_writer_type=hash], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:LongType, d_moy:LongType, sum_sales:DecimalType(17,2), _w0:DecimalType(17,2)), [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:LongType, d_moy:LongType, sum_sales:DecimalType(17,2), _w0:DecimalType(17,2))
            :     :                          +- VeloxResizeBatches 1024, 2147483647
            :     :                             +- ^(13) ProjectExecTransformer [hash(i_category#2102, i_brand#2098, cc_name#2900, 42) AS hash_partition_key#3857, i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, MakeDecimal(sum(UnscaledValue(cs_sales_price#2750))#3210L,17,2) AS sum_sales#3203, MakeDecimal(sum(UnscaledValue(cs_sales_price#2750))#3210L,17,2) AS _w0#3221]
            :     :                                +- ^(13) HashAggregateTransformer(keys=[i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L], functions=[sum(UnscaledValue(cs_sales_price#2750))], isStreamingAgg=false, output=[i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum(UnscaledValue(cs_sales_price#2750))#3210L])
            :     :                                   +- ^(13) InputIteratorTransformer[i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum#3728L]
            :     :                                      +- AQEShuffleRead coalesced
            :     :                                         +- ShuffleQueryStage 15
            :     :                                            +- ColumnarExchange hashpartitioning(i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, 100), ENSURE_REQUIREMENTS, [i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum#3728L], [plan_id=3929], [shuffle_writer_type=hash], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:LongType, d_moy:LongType, sum:LongType), [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:LongType, d_moy:LongType, sum:LongType)
            :     :                                               +- VeloxResizeBatches 1024, 2147483647
            :     :                                                  +- ^(10) ProjectExecTransformer [hash(i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, 42) AS hash_partition_key#3823, i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum#3728L]
            :     :                                                     +- ^(10) FlushableHashAggregateTransformer(keys=[i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L], functions=[partial_sum(_pre_0#3820L)], isStreamingAgg=false, output=[i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum#3728L])
            :     :                                                        +- ^(10) ProjectExecTransformer [i_brand#2098, i_category#2102, cs_sales_price#2750, d_year#2516L, d_moy#2518L, cc_name#2900, UnscaledValue(cs_sales_price#2750) AS _pre_0#3820L]
            :     :                                                           +- ^(10) BroadcastHashJoinExecTransformer [cs_call_center_sk#2740L], [cc_call_center_sk#2894L], Inner, BuildRight, false
            :     :                                                              :- ^(10) ProjectExecTransformer [i_brand#2098, i_category#2102, cs_call_center_sk#2740L, cs_sales_price#2750, d_year#2516L, d_moy#2518L]
            :     :                                                              :  +- ^(10) BroadcastHashJoinExecTransformer [cs_sold_date_sk#2729L], [d_date_sk#2510L], Inner, BuildRight, false
            :     :                                                              :     :- ^(10) ProjectExecTransformer [i_brand#2098, i_category#2102, cs_sold_date_sk#2729L, cs_call_center_sk#2740L, cs_sales_price#2750]
            :     :                                                              :     :  +- ^(10) BroadcastHashJoinExecTransformer [i_item_sk#2090L], [cs_item_sk#2744L], Inner, BuildLeft, false
            :     :                                                              :     :     :- ^(10) InputIteratorTransformer[i_item_sk#2090L, i_brand#2098, i_category#2102]
            :     :                                                              :     :     :  +- BroadcastQueryStage 0
            :     :                                                              :     :     :     +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2809]
            :     :                                                              :     :     :        +- ^(1) FilterExecTransformer ((isnotnull(i_item_sk#2090L) AND isnotnull(i_category#2102)) AND isnotnull(i_brand#2098))
            :     :                                                              :     :     :           +- ^(1) NativeFileScan parquet default.item[i_item_sk#2090L,i_brand#2098,i_category#2102] Batched: true, DataFilters: [isnotnull(i_item_sk#2090L), isnotnull(i_category#2102), isnotnull(i_brand#2098)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tpcds-generated-1.0/item], PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk), IsNotNull(i_category), IsNotNull(i_brand)], ReadSchema: struct<i_item_sk:bigint,i_brand:string,i_category:string>
            :     :                                                              :     :     +- ^(10) FilterExecTransformer ((isnotnull(cs_item_sk#2744L) AND isnotnull(cs_sold_date_sk#2729L)) AND isnotnull(cs_call_center_sk#2740L))
            :     :                                                              :     :        +- ^(10) NativeFileScan parquet default.catalog_sales[cs_sold_date_sk#2729L,cs_call_center_sk#2740L,cs_item_sk#2744L,cs_sales_price#2750] Batched: true, DataFilters: [isnotnull(cs_item_sk#2744L), isnotnull(cs_sold_date_sk#2729L), isnotnull(cs_call_center_sk#2740L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tpcds-generated-1.0/catalog_sales], PartitionFilters: [], PushedFilters: [IsNotNull(cs_item_sk), IsNotNull(cs_sold_date_sk), IsNotNull(cs_call_center_sk)], ReadSchema: struct<cs_sold_date_sk:bigint,cs_call_center_sk:bigint,cs_item_sk:bigint,cs_sales_price:decimal(7...
            :     :                                                              :     +- ^(10) InputIteratorTransformer[d_date_sk#2510L, d_year#2516L, d_moy#2518L]
            :     :                                                              :        +- BroadcastQueryStage 1
            :     :                                                              :           +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2841]
            :     :                                                              :              +- ^(2) FilterExecTransformer ((((d_year#2516L = 1999) OR ((d_year#2516L = 1998) AND (d_moy#2518L = 12))) OR ((d_year#2516L = 2000) AND (d_moy#2518L = 1))) AND isnotnull(d_date_sk#2510L))
            :     :                                                              :                 +- ^(2) NativeFileScan parquet default.date_dim[d_date_sk#2510L,d_year#2516L,d_moy#2518L] Batched: true, DataFilters: [(((d_year#2516L = 1999) OR ((d_year#2516L = 1998) AND (d_moy#2518L = 12))) OR ((d_year#2516L = 2..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tpcds-generated-1.0/date_dim], PartitionFilters: [], PushedFilters: [Or(Or(EqualTo(d_year,1999),And(EqualTo(d_year,1998),EqualTo(d_moy,12))),And(EqualTo(d_year,2000)..., ReadSchema: struct<d_date_sk:bigint,d_year:bigint,d_moy:bigint>
            :     :                                                              +- ^(10) InputIteratorTransformer[cc_call_center_sk#2894L, cc_name#2900]
            :     :                                                                 +- BroadcastQueryStage 2
            :     :                                                                    +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=2873]
            :     :                                                                       +- ^(3) FilterExecTransformer (isnotnull(cc_call_center_sk#2894L) AND isnotnull(cc_name#2900))
            :     :                                                                          +- ^(3) NativeFileScan parquet default.call_center[cc_call_center_sk#2894L,cc_name#2900] Batched: true, DataFilters: [isnotnull(cc_call_center_sk#2894L), isnotnull(cc_name#2900)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tpcds-generated-1.0/call_center], PartitionFilters: [], PushedFilters: [IsNotNull(cc_call_center_sk), IsNotNull(cc_name)], ReadSchema: struct<cc_call_center_sk:bigint,cc_name:string>
            :     +- ^(18) InputIteratorTransformer[i_category#3499, i_brand#3495, cc_name#3577, sum_sales#3227, rn#3606]
            :        +- BroadcastQueryStage 24
            :           +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] + 1)),false), [plan_id=4910]
            :              +- ^(16) ProjectExecTransformer [i_category#3499, i_brand#3495, cc_name#3577, sum_sales#3203 AS sum_sales#3227, rn#3606]
            :                 +- ^(16) WindowExecTransformer [rank(d_year#3549L, d_moy#3551L) windowspecdefinition(i_category#3499, i_brand#3495, cc_name#3577, d_year#3549L ASC NULLS FIRST, d_moy#3551L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#3606], [i_category#3499, i_brand#3495, cc_name#3577], [d_year#3549L ASC NULLS FIRST, d_moy#3551L ASC NULLS FIRST]
            :                    +- ^(16) SortExecTransformer [i_category#3499 ASC NULLS FIRST, i_brand#3495 ASC NULLS FIRST, cc_name#3577 ASC NULLS FIRST, d_year#3549L ASC NULLS FIRST, d_moy#3551L ASC NULLS FIRST], false, 0
            :                       +- ^(16) InputIteratorTransformer[i_category#3499, i_brand#3495, cc_name#3577, d_year#3549L, d_moy#3551L, sum_sales#3203]
            :                          +- AQEShuffleRead coalesced
            :                             +- ShuffleQueryStage 21
            :                                +- ColumnarExchange hashpartitioning(i_category#3499, i_brand#3495, cc_name#3577, 100), ENSURE_REQUIREMENTS, [i_category#3499, i_brand#3495, cc_name#3577, d_year#3549L, d_moy#3551L, sum_sales#3203], [plan_id=4609], [shuffle_writer_type=hash], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:LongType, d_moy:LongType, sum_sales:DecimalType(17,2)), [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:LongType, d_moy:LongType, sum_sales:DecimalType(17,2))
            :                                   +- VeloxResizeBatches 1024, 2147483647
            :                                      +- ^(14) ProjectExecTransformer [hash(i_category#3499, i_brand#3495, cc_name#3577, 42) AS hash_partition_key#3866, i_category#3499, i_brand#3495, cc_name#3577, d_year#3549L, d_moy#3551L, MakeDecimal(sum(UnscaledValue(cs_sales_price#3530))#3210L,17,2) AS sum_sales#3203]
            :                                         +- ^(14) HashAggregateTransformer(keys=[i_category#3499, i_brand#3495, cc_name#3577, d_year#3549L, d_moy#3551L], functions=[sum(UnscaledValue(cs_sales_price#3530))], isStreamingAgg=false, output=[i_category#3499, i_brand#3495, cc_name#3577, d_year#3549L, d_moy#3551L, sum(UnscaledValue(cs_sales_price#3530))#3210L])
            :                                            +- ^(14) InputIteratorTransformer[i_category#3499, i_brand#3495, cc_name#3577, d_year#3549L, d_moy#3551L, sum#3730L]
            :                                               +- AQEShuffleRead coalesced
            :                                                  +- ShuffleQueryStage 17
            :                                                     +- ReusedExchange [i_category#3499, i_brand#3495, cc_name#3577, d_year#3549L, d_moy#3551L, sum#3730L], ColumnarExchange hashpartitioning(i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, 100), ENSURE_REQUIREMENTS, [i_category#2102, i_brand#2098, cc_name#2900, d_year#2516L, d_moy#2518L, sum#3728L], [plan_id=3929], [shuffle_writer_type=hash], [OUTPUT] List(i_category:StringType, i_brand:StringType, cc_name:StringType, d_year:LongType, d_moy:LongType, sum:LongType)
            +- ^(18) InputIteratorTransformer[i_category#3619, i_brand#3615, cc_name#3697, sum_sales#3235, rn#3726]
               +- BroadcastQueryStage 26
                  +- ReusedExchange [i_category#3619, i_brand#3615, cc_name#3697, sum_sales#3235, rn#3726], ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] + 1)),false), [plan_id=4910]
wenfang6 commented 3 weeks ago

@zhztheplayer Our environment uses Spark 3.2.1, and I have done some compatibility work. That seems to be an issue with our Spark version 3.2.1 . I'm a bit confused right now. Which direction should I investigate from.

image

zhztheplayer commented 3 weeks ago

That seems to be an issue with our Spark version 3.2.1

Could be possible but I am not sure. I only tested with 3.2.2.

I think the issue actually happened in this stage in your query plan:

BroadcastQueryStage 27
          +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] - 1)),false), [id=#3024]
             +- BroadcastQueryStage 26
                +- ReusedExchange [i_category#531, i_brand#527, cc_name#609, sum_sales#147, rn#638], ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true], input[1, string, true], input[2, string, true], (input[4, int, false] + 1)),false), [id=#2832]

The reason here is, one broadcast exchange cannot become another broadcast exchange's child. This limitation is by vanilla Spark.

Perhaps you can add a breakpoint at the creation of the two ColumnarBroadcastExchanges to debug the program, to see why two exchanges rather than one exchange are needed. As eventually query planner should give a plan that has only one exchange.

wenfang6 commented 3 weeks ago

@zhztheplayer thank you very much. l will try it.