apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.14k stars 415 forks source link

[VL] Result mismatch found in FlushableAgg #6630

Open jiangjiangtian opened 1 month ago

jiangjiangtian commented 1 month ago

Backend

VL (Velox)

Bug description

I have a sql query that runs in gluten and vanilla spark, its format is as follows:

select count(*) from ((
    select *
    from test1
    where xxx
  )a
  left join
  (
    select col_a, col_b, col_c, col_d, col_e
    from test2
    where xxx
    group by col_a
            ,col_b
            ,col_c
            ,col_d
            ,col_e
  )b
ON a.col1 = b.col1);

I get different number of rows. And I look at the spark ui, I found the reason is that the numbers of rows of the second subquery don't match. vanilla spark: image

gluten:

image image

Actually, I found that some rows are duplicate. But when I just run the second subquery, I get the right result.

image image

We can see the plan is different. The second hash aggregation is regular.

Besides, I set spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation to false and I get the right result.

image image

So I think there might be a bug for flushable hash aggregation or the plan conversion, but I can't find a small SQL to demonstrate the bug. I'm sorry for not having a small example.

Spark version

3.0

Spark configurations

No response

System information

Velox System Info v0.0.2 Commit: 96712646c63bf4305cca4eaa7dfd26c2179547b1 CMake Version: 3.17.5 System: Linux-3.10.0-862.mt20190308.130.el7.x86_64 Arch: x86_64 CPU Name: Model name: Intel(R) Xeon(R) Platinum 8255C CPU @ 2.50GHz C++ Compiler: /opt/rh/devtoolset-10/root/usr/bin/c++ C++ Compiler Version: 10.2.1 C Compiler: /opt/rh/devtoolset-10/root/usr/bin/cc C Compiler Version: 10.2.1 CMake Prefix Path: /usr/local;/usr;/;/usr;/usr/local;/usr/X11R6;/usr/pkg;/opt

Relevant logs

No response

Yohahaha commented 1 month ago

could you post Gluten version/commit?

FlushableAgg has non-empty metrics of number of flushed rows, may lead wrong results.

jiangjiangtian commented 1 month ago

could you post Gluten version/commit?

FlushableAgg has non-empty metrics of number of flushed rows, may lead wrong results.

Thanks for reply. My Gluten version is v1.2.0-rc1

Yohahaha commented 1 month ago

@jiangjiangtian could you try with spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false?

jiangjiangtian commented 1 month ago

@jiangjiangtian could you try with spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false?

yes, I have tried. The result is right.

jiangjiangtian commented 1 month ago

@Yohahaha I add two screenshots above, you can see that when I set spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation to false, the two hash aggregation is regular. So I think we might need to always let the second hash aggregation be a full aggregation. How do you think?

jiangjiangtian commented 1 month ago

@kecookier

kecookier commented 1 month ago

@zhztheplayer Can you help take a look at this problem?

Yohahaha commented 1 month ago

@Yohahaha I add two screenshots above, you can see that when I set spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation to false, the two hash aggregation is regular. So I think we might need to always let the second hash aggregation be a full aggregation. How do you think?

I see the new description.

I have submit a similar issue https://github.com/apache/incubator-gluten/issues/4421.

FelixYBW commented 1 month ago

4421 is closed. Is the issue fixed?

kecookier commented 1 month ago

4421 is closed. Is the issue fixed?

@jiangjiangtian Please check it.

jiangjiangtian commented 1 month ago

4421 is closed. Is the issue fixed?

@jiangjiangtian Please check it.

ok, I will check it.

kecookier commented 1 month ago

@FelixYBW @Yohahaha I'm sure it's not caused by https://github.com/apache/incubator-gluten/issues/4421. We tested with 1.2.0-rc1, which already contains this fix .

Yohahaha commented 1 month ago

@FelixYBW @Yohahaha I'm sure it's not caused by #4421. We tested with 1.2.0-rc1, which already contains this fix .

I mean #4421 is similar but not same as current issue.

FelixYBW commented 1 month ago

@kecookier we will take a look

kecookier commented 1 month ago

SQL explain.

== Parsed Logical Plan ==
'InsertIntoStatement 'UnresolvedRelation [table_dst], Map(partition_date -> None), true, false
+- 'Aggregate ['a.col_main_noi__id, 'a.partition_date, 'b.aaa_col_user_id, 'b.ccc_name], ['a.col_main_noi__id AS main_noi__id#103, 'max('a.noi__name) AS noi__name#104, 'max('noi__close_status) AS noi__status#105, 'b.aaa_col_user_id, 'b.ccc_name, 'if(('sum('c.col_bal) > 0), 1, 0) AS is_has_col_bal#106, 'sum('coalesce('c.col_bal, 0)) AS aaa_col_bal#107, 'max('b.ccc_bu_code) AS ccc_bu_code#108, 'max('b.ccc_bu_name) AS ccc_bu_name#109, 'a.partition_date]
   +- 'Join LeftOuter, ('c.col_user_id = 'b.aaa_col_user_id)
      :- 'Join LeftOuter, ('a.noi__id = 'b.dp_noi__id)
      :  :- 'SubqueryAlias a
      :  :  +- 'Project ['noi__id, 'col_main_noi__id, 'noi__name, 'noi__close_status, 'partition_date]
      :  :     +- 'Filter ((('partition_date = 2024-07-28) AND ('partition_chain = dp)) AND (NOT ('noi__close_status = 1) AND (('col_noi__cate1_id = 226) OR ('col_noi__cate2_id = 380))))
      :  :        +- 'UnresolvedRelation [tmp_db, table_c]
      :  +- 'SubqueryAlias b
      :     +- 'Aggregate ['aaa_col_user_id, 'ccc_name, 'ccc_bu_code, 'ccc_bu_name, 'dp_noi__id], ['aaa_col_user_id, 'ccc_name, 'ccc_bu_code, 'ccc_bu_name, 'dp_noi__id]
      :        +- 'Filter (('partition_date = 2024-07-28) AND 'ccc_bu_code IN (2,18,28,41,51,56,59,66))
      :           +- 'UnresolvedRelation [tmp_db, table_b]
      +- 'SubqueryAlias c
         +- 'Aggregate ['col_user_id], ['col_user_id, 'sum('col_bal) AS col_bal#100, 'sum('cash_col_bal) AS cash_col_bal#101, 'sum('redpack_col_bal) AS redpack_col_bal#102]
            +- 'Filter (('partition_date = 2024-07-28) AND 'pool_type IN (0,6))
               +- 'UnresolvedRelation [tmp_db, table_a]

== Analyzed Logical Plan ==

InsertIntoHiveTable ebt_12.table_dst `ebt_12`.`table_dst`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(partition_date -> None), true, false, [main_noi__id, noi__name, noi__status, col_user_id, ccc_name, is_has_col_bal, aaa_col_bal, ccc_bu_code, ccc_bu_name, partition_date]
+- Project [cast(main_noi__id#103L as bigint) AS main_noi__id#403L, cast(noi__name#104 as string) AS noi__name#404, cast(noi__status#105L as int) AS noi__status#405, cast(aaa_col_user_id#205L as bigint) AS col_user_id#406L, ccc_name#206, cast(is_has_col_bal#106 as int) AS is_has_col_bal#407, cast(aaa_col_bal#107 as double) AS aaa_col_bal#408, cast(ccc_bu_code#108 as bigint) AS ccc_bu_code#409L, cast(ccc_bu_name#109 as string) AS ccc_bu_name#410, cast(partition_date#203 as string) AS partition_date#411]
   +- Aggregate [col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], [col_main_noi__id#113L AS main_noi__id#103L, max(noi__name#114) AS noi__name#104, max(noi__close_status#120L) AS noi__status#105L, aaa_col_user_id#205L, ccc_name#206, if ((sum(col_bal#100) > cast(0 as double))) 1 else 0 AS is_has_col_bal#106, sum(coalesce(col_bal#100, cast(0 as double))) AS aaa_col_bal#107, max(ccc_bu_code#207) AS ccc_bu_code#108, max(ccc_bu_name#208) AS ccc_bu_name#109, partition_date#203]
      +- Join LeftOuter, (col_user_id#222L = aaa_col_user_id#205L)
         :- Join LeftOuter, (noi__id#111L = dp_noi__id#209L)
         :  :- SubqueryAlias a
         :  :  +- Project [noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
         :  :     +- Filter (((partition_date#203 = 2024-07-28) AND (partition_chain#204 = dp)) AND (NOT (noi__close_status#120L = cast(1 as bigint)) AND ((col_noi__cate1_id#146L = cast(226 as bigint)) OR (col_noi__cate2_id#144L = cast(380 as bigint)))))
         :  :        +- SubqueryAlias spark_catalog.tmp_db.table_c
         :  :           +- Relation tmp_db.table_c[chain#110L,noi__id#111L,xmd_main_noi__id#112L,col_main_noi__id#113L,noi__name#114,noi__phone#115,address#116,barea_id#117L,brand_id#118L,brand_name#119,noi__close_status#120L,latitude#121L,longitude#122L,nation_code#123L,org_id#124L,org_name#125,org_rank#126,org_type#127,main_org_id#128L,main_org_name#129,col_city_id#130L,city_name#131,col_main_city_id#132L,main_city_name#133,... 71 more fields] orc
         :  +- SubqueryAlias b
         :     +- Aggregate [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
         :        +- Filter ((partition_date#221 = 2024-07-28) AND cast(ccc_bu_code#207 as string) IN (cast(2 as string),cast(18 as string),cast(28 as string),cast(41 as string),cast(51 as string),cast(56 as string),cast(59 as string),cast(66 as string)))
         :           +- SubqueryAlias spark_catalog.tmp_db.table_b
         :              +- Relation tmp_db.table_b[aaa_col_user_id#205L,ccc_name#206,ccc_bu_code#207,ccc_bu_name#208,dp_noi__id#209L,col_noi__id#210L,coop_date#211,customer_name#212,src_id#213L,src_name#214,operate_id#215L,operate_name#216,respons_bd_id#217L,respons_charge_name#218,ccc_status#219,last_promote_time#220,partition_date#221] orc
         +- SubqueryAlias c
            +- Aggregate [col_user_id#222L], [col_user_id#222L, sum(col_bal#224) AS col_bal#100, sum(cash_col_bal#225) AS cash_col_bal#101, sum(redpack_col_bal#226) AS redpack_col_bal#102]
               +- Filter ((partition_date#246 = 2024-07-28) AND cast(pool_type#223L as bigint) IN (cast(0 as bigint),cast(6 as bigint)))
                  +- SubqueryAlias spark_catalog.tmp_db.table_a
                     +- Relation tmp_db.table_a[col_user_id#222L,pool_type#223L,col_bal#224,cash_col_bal#225,redpack_col_bal#226,alliance_redpack_col_bal#227,normal_redpack_col_bal#228,free_room_col_bal#229,refund_frozen_cash#230,refund_frozen_redpack#231,recharge_frozen_cash#232,charge_amt#233,incloud_refund_recharge_amt#234,cash_recharge_amt#235,incloud_cash_recharge_amt#236,redpack_recharge_amt#237,incloud_redpack_recharge_amt#238,fst_cash_recharge_time#239,fst_cash_recharge_amt#240,fst_redpack_recharge_time#241,fst_redpack_recharge_amt#242,accu_charge_cnt#243L,accu_cash_recharge_cnt#244L,acc_redpack_recharge_cnt#245L,partition_date#246] orc

== Optimized Logical Plan ==
InsertIntoHiveTable ebt_12.table_dst `ebt_12`.`table_dst`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(partition_date -> None), true, false, [main_noi__id, noi__name, noi__status, col_user_id, ccc_name, is_has_col_bal, aaa_col_bal, ccc_bu_code, ccc_bu_name, partition_date]
+- Aggregate [col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], [col_main_noi__id#113L AS main_noi__id#403L, max(noi__name#114) AS noi__name#404, cast(max(noi__close_status#120L) as int) AS noi__status#405, aaa_col_user_id#205L AS col_user_id#406L, ccc_name#206, if ((sum(col_bal#100) > 0.0)) 1 else 0 AS is_has_col_bal#407, sum(coalesce(col_bal#100, 0.0)) AS aaa_col_bal#408, cast(max(ccc_bu_code#207) as bigint) AS ccc_bu_code#409L, max(ccc_bu_name#208) AS ccc_bu_name#410, partition_date#203 AS partition_date#411]
   +- Project [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, col_bal#100]
      +- Join LeftOuter, (col_user_id#222L = aaa_col_user_id#205L)
         :- Project [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208]
         :  +- Join LeftOuter, (noi__id#111L = dp_noi__id#209L)
         :     :- Project [noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
         :     :  +- Filter ((((((isnotnull(noi__close_status#120L) AND isnotnull(partition_date#203)) AND isnotnull(partition_chain#204)) AND (partition_date#203 = 2024-07-28)) AND (partition_chain#204 = dp)) AND NOT (noi__close_status#120L = 1)) AND ((col_noi__cate1_id#146L = 226) OR (col_noi__cate2_id#144L = 380)))
         :     :     +- Relation tmp_db.table_c[chain#110L,noi__id#111L,xmd_main_noi__id#112L,col_main_noi__id#113L,noi__name#114,noi__phone#115,address#116,barea_id#117L,brand_id#118L,brand_name#119,noi__close_status#120L,latitude#121L,longitude#122L,nation_code#123L,org_id#124L,org_name#125,org_rank#126,org_type#127,main_org_id#128L,main_org_name#129,col_city_id#130L,city_name#131,col_main_city_id#132L,main_city_name#133,... 71 more fields] orc
         :     +- Aggregate [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
         :        +- Project [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
         :           +- Filter (((isnotnull(partition_date#221) AND (partition_date#221 = 2024-07-28)) AND ccc_bu_code#207 IN (2,18,28,41,51,56,59,66)) AND isnotnull(dp_noi__id#209L))
         :              +- Relation tmp_db.table_b[aaa_col_user_id#205L,ccc_name#206,ccc_bu_code#207,ccc_bu_name#208,dp_noi__id#209L,col_noi__id#210L,coop_date#211,customer_name#212,src_id#213L,src_name#214,operate_id#215L,operate_name#216,respons_bd_id#217L,respons_charge_name#218,ccc_status#219,last_promote_time#220,partition_date#221] orc
         +- Aggregate [col_user_id#222L], [col_user_id#222L, sum(col_bal#224) AS col_bal#100]
            +- Project [col_user_id#222L, col_bal#224]
               +- Filter (((isnotnull(partition_date#246) AND (partition_date#246 = 2024-07-28)) AND pool_type#223L IN (0,6)) AND isnotnull(col_user_id#222L))
                  +- Relation tmp_db.table_a[col_user_id#222L,pool_type#223L,col_bal#224,cash_col_bal#225,redpack_col_bal#226,alliance_redpack_col_bal#227,normal_redpack_col_bal#228,free_room_col_bal#229,refund_frozen_cash#230,refund_frozen_redpack#231,recharge_frozen_cash#232,charge_amt#233,incloud_refund_recharge_amt#234,cash_recharge_amt#235,incloud_cash_recharge_amt#236,redpack_recharge_amt#237,incloud_redpack_recharge_amt#238,fst_cash_recharge_time#239,fst_cash_recharge_amt#240,fst_redpack_recharge_time#241,fst_redpack_recharge_amt#242,accu_charge_cnt#243L,accu_cash_recharge_cnt#244L,acc_redpack_recharge_cnt#245L,partition_date#246] orc

== Physical Plan ==
Execute InsertIntoHiveTable ebt_12.table_dst `ebt_12`.`table_dst`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(partition_date -> None), true, false, [main_noi__id, noi__name, noi__status, col_user_id, ccc_name, is_has_col_bal, aaa_col_bal, ccc_bu_code, ccc_bu_name, partition_date]
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      VeloxColumnarToRowExec
      +- ^(6) ProjectExecTransformer [col_main_noi__id#113L AS main_noi__id#403L, max(noi__name#114)#250 AS noi__name#404, cast(max(noi__close_status#120L)#251L as int) AS noi__status#405, aaa_col_user_id#205L AS col_user_id#406L, ccc_name#206, if ((sum(col_bal#100)#252 > 0.0)) 1 else 0 AS is_has_col_bal#407, sum(coalesce(col_bal#100, 0.0))#255 AS aaa_col_bal#408, cast(max(ccc_bu_code#207)#253 as bigint) AS ccc_bu_code#409L, max(ccc_bu_name#208)#254 AS ccc_bu_name#410, partition_date#203 AS partition_date#411]
         +- ^(6) HashAggregateTransformer(keys=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], functions=[max(noi__name#114), max(noi__close_status#120L), sum(col_bal#100), sum(coalesce(col_bal#100, 0.0)), max(ccc_bu_code#207), max(ccc_bu_name#208)], isStreamingAgg=false, output=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, max(noi__name#114)#250, max(noi__close_status#120L)#251L, sum(col_bal#100)#252, sum(coalesce(col_bal#100, 0.0))#255, max(ccc_bu_code#207)#253, max(ccc_bu_name#208)#254])
            +- ^(6) HashAggregateTransformer(keys=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], functions=[partial_max(noi__name#114), partial_max(noi__close_status#120L), partial_sum(col_bal#100), partial_sum(_pre_0#433), partial_max(ccc_bu_code#207), partial_max(ccc_bu_name#208)], isStreamingAgg=false, output=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, max#418, max#419L, sum#420, sum#421, max#422, max#423])
               +- ^(6) ProjectExecTransformer [ccc_bu_name#208, noi__name#114, ccc_bu_code#207, noi__close_status#120L, col_main_noi__id#113L, partition_date#203, ccc_name#206, aaa_col_user_id#205L, col_bal#100, coalesce(col_bal#100, 0.0) AS _pre_0#433]
                  +- ^(6) ShuffledHashJoinExecTransformer [aaa_col_user_id#205L], [col_user_id#222L], LeftOuter, BuildRight
                     :- ^(6) InputIteratorTransformer[col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208]
                     :  +- CustomShuffleReader coalesced
                     :     +- ShuffleQueryStage 4
                     :        +- ColumnarExchange hashpartitioning(aaa_col_user_id#205L, 2000), ENSURE_REQUIREMENTS, [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208], [id=#1555], [id=#1555], [OUTPUT] List(col_main_noi__id:LongType, noi__name:StringType, noi__close_status:LongType, partition_date:StringType, aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType), [OUTPUT] List(col_main_noi__id:LongType, noi__name:StringType, noi__close_status:LongType, partition_date:StringType, aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType)
                     :           +- VeloxAppendBatches 3276
                     :              +- ^(5) ProjectExecTransformer [hash(aaa_col_user_id#205L, 42) AS hash_partition_key#432, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208]
                     :                 +- ^(5) ShuffledHashJoinExecTransformer [noi__id#111L], [dp_noi__id#209L], LeftOuter, BuildRight
                     :                    :- ^(5) InputIteratorTransformer[noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
                     :                    :  +- CustomShuffleReader coalesced
                     :                    :     +- ShuffleQueryStage 0
                     :                    :        +- ColumnarExchange hashpartitioning(noi__id#111L, 2000), ENSURE_REQUIREMENTS, [noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203], [id=#1118], [id=#1118], [OUTPUT] List(noi__id:LongType, col_main_noi__id:LongType, noi__name:StringType, noi__close_status:LongType, partition_date:StringType), [OUTPUT] List(noi__id:LongType, col_main_noi__id:LongType, noi__name:StringType, noi__close_status:LongType, partition_date:StringType)
                     :                    :           +- VeloxAppendBatches 3276
                     :                    :              +- ^(1) ProjectExecTransformer [hash(noi__id#111L, 42) AS hash_partition_key#427, noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
                     :                    :                 +- ^(1) FilterExecTransformer ((isnotnull(noi__close_status#120L) AND NOT (noi__close_status#120L = 1)) AND ((col_noi__cate1_id#146L = 226) OR (col_noi__cate2_id#144L = 380)))
                     :                    :                    +- ^(1) NativeFileScan orc tmp_db.table_c[noi__id#111L,col_main_noi__id#113L,noi__name#114,noi__close_status#120L,col_noi__cate2_id#144L,col_noi__cate1_id#146L,partition_date#203,partition_chain#204] Batched: true, DataFilters: [isnotnull(noi__close_status#120L), NOT (noi__close_status#120L = 1), ((col_noi__cate1_id#146L = 226)..., Format: ORC, Location: InMemoryFileIndex(1 paths)[viewfs://hadoop-xxx, PartitionFilters: [isnotnull(partition_date#203), isnotnull(partition_chain#204), (partition_date#203 = 2024-07-28)..., PushedFilters: [IsNotNull(noi__close_status), Not(EqualTo(noi__close_status,1)), Or(EqualTo(col_noi__cate1_id,226),E..., ReadSchema: struct<noi__id:bigint,col_main_noi__id:bigint,noi__name:string,noi__close_status:bigint,col_noi__cate2_i...
                     :                    +- ^(5) InputIteratorTransformer[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                       +- CustomShuffleReader coalesced
                     :                          +- ShuffleQueryStage 3
                     :                             +- ColumnarExchange hashpartitioning(dp_noi__id#209L, 2000), ENSURE_REQUIREMENTS, [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], [id=#1373], [id=#1373], [OUTPUT] List(aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType, dp_noi__id:LongType), [OUTPUT] List(aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType, dp_noi__id:LongType)
                     :                                +- VeloxAppendBatches 3276
                     :                                   +- ^(4) ProjectExecTransformer [hash(dp_noi__id#209L, 42) AS hash_partition_key#431, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                                      +- ^(4) FlushableHashAggregateTransformer(keys=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], functions=[], isStreamingAgg=false, output=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L])
                     :                                         +- ^(4) InputIteratorTransformer[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                                            +- CustomShuffleReader coalesced
                     :                                               +- ShuffleQueryStage 1
                     :                                                  +- ColumnarExchange hashpartitioning(aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L, 2000), ENSURE_REQUIREMENTS, [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], [id=#1169], [id=#1169], [OUTPUT] List(aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType, dp_noi__id:LongType), [OUTPUT] List(aaa_col_user_id:LongType, ccc_name:StringType, ccc_bu_code:StringType, ccc_bu_name:StringType, dp_noi__id:LongType)
                     :                                                     +- VeloxAppendBatches 3276
                     :                                                        +- ^(2) ProjectExecTransformer [hash(aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L, 42) AS hash_partition_key#428, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                                                           +- ^(2) FlushableHashAggregateTransformer(keys=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], functions=[], isStreamingAgg=false, output=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L])
                     :                                                              +- ^(2) ProjectExecTransformer [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                     :                                                                 +- ^(2) FilterExecTransformer (ccc_bu_code#207 IN (2,18,28,41,51,56,59,66) AND isnotnull(dp_noi__id#209L))
                     :                                                                    +- ^(2) NativeFileScan orc tmp_db.table_b[aaa_col_user_id#205L,ccc_name#206,ccc_bu_code#207,ccc_bu_name#208,dp_noi__id#209L,partition_date#221] Batched: true, DataFilters: [ccc_bu_code#207 IN (2,18,28,41,51,56,59,66), isnotnull(dp_noi__id#209L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[viewfs://hadoop-xxx/..., PartitionFilters: [isnotnull(partition_date#221), (partition_date#221 = 2024-07-28)], PushedFilters: [In(ccc_bu_code, [2,18,28,41,51,56,59,66]), IsNotNull(dp_noi__id)], ReadSchema: struct<aaa_col_user_id:bigint,ccc_name:string,ccc_bu_code:string,ccc_bu_name:string,dp...
                     +- ^(6) HashAggregateTransformer(keys=[col_user_id#222L], functions=[sum(col_bal#224)], isStreamingAgg=false, output=[col_user_id#222L, col_bal#100])
                        +- ^(6) InputIteratorTransformer[col_user_id#222L, sum#425]
                           +- CustomShuffleReader coalesced
                              +- ShuffleQueryStage 2
                                 +- ColumnarExchange hashpartitioning(col_user_id#222L, 2000), ENSURE_REQUIREMENTS, [col_user_id#222L, sum#425], [id=#1235], [id=#1235], [OUTPUT] List(col_user_id:LongType, sum:DoubleType), [OUTPUT] List(col_user_id:LongType, sum:DoubleType)
                                    +- VeloxAppendBatches 3276
                                       +- ^(3) ProjectExecTransformer [hash(col_user_id#222L, 42) AS hash_partition_key#430, col_user_id#222L, sum#425]
                                          +- ^(3) FlushableHashAggregateTransformer(keys=[col_user_id#222L], functions=[partial_sum(col_bal#224)], isStreamingAgg=false, output=[col_user_id#222L, sum#425])
                                             +- ^(3) ProjectExecTransformer [col_user_id#222L, col_bal#224]
                                                +- ^(3) FilterExecTransformer (pool_type#223L IN (0,6) AND isnotnull(col_user_id#222L))
                                                   +- ^(3) NativeFileScan orc tmp_db.table_a[col_user_id#222L,pool_type#223L,col_bal#224,partition_date#246] Batched: true, DataFilters: [pool_type#223L IN (0,6), isnotnull(col_user_id#222L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[viewfs://hadoop-xxx/..., PartitionFilters: [isnotnull(partition_date#246), (partition_date#246 = 2024-07-28)], PushedFilters: [In(pool_type, [0,6]), IsNotNull(col_user_id)], ReadSchema: struct<col_user_id:bigint,pool_type:bigint,col_bal:double>
   +- == Initial Plan ==
      SortAggregate(key=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], functions=[max(noi__name#114), max(noi__close_status#120L), sum(col_bal#100), sum(coalesce(col_bal#100, 0.0)), max(ccc_bu_code#207), max(ccc_bu_name#208)], output=[main_noi__id#403L, noi__name#404, noi__status#405, col_user_id#406L, ccc_name#206, is_has_col_bal#407, aaa_col_bal#408, ccc_bu_code#409L, ccc_bu_name#410, partition_date#411])
      +- SortAggregate(key=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206], functions=[partial_max(noi__name#114), partial_max(noi__close_status#120L), partial_sum(col_bal#100), partial_sum(coalesce(col_bal#100, 0.0)), partial_max(ccc_bu_code#207), partial_max(ccc_bu_name#208)], output=[col_main_noi__id#113L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, max#418, max#419L, sum#420, sum#421, max#422, max#423])
         +- Sort [col_main_noi__id#113L ASC NULLS FIRST, partition_date#203 ASC NULLS FIRST, aaa_col_user_id#205L ASC NULLS FIRST, ccc_name#206 ASC NULLS FIRST], false, 0
            +- Project [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, col_bal#100]
               +- SortMergeJoin [aaa_col_user_id#205L], [col_user_id#222L], LeftOuter
                  :- Sort [aaa_col_user_id#205L ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(aaa_col_user_id#205L, 2000), ENSURE_REQUIREMENTS, [id=#1039]
                  :     +- Project [col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203, aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208]
                  :        +- SortMergeJoin [noi__id#111L], [dp_noi__id#209L], LeftOuter
                  :           :- Sort [noi__id#111L ASC NULLS FIRST], false, 0
                  :           :  +- Exchange hashpartitioning(noi__id#111L, 2000), ENSURE_REQUIREMENTS, [id=#1029]
                  :           :     +- Project [noi__id#111L, col_main_noi__id#113L, noi__name#114, noi__close_status#120L, partition_date#203]
                  :           :        +- Filter ((isnotnull(noi__close_status#120L) AND NOT (noi__close_status#120L = 1)) AND ((col_noi__cate1_id#146L = 226) OR (col_noi__cate2_id#144L = 380)))
                  :           :           +- FileScan orc tmp_db.table_c[noi__id#111L,col_main_noi__id#113L,noi__name#114,noi__close_status#120L,col_noi__cate2_id#144L,col_noi__cate1_id#146L,partition_date#203,partition_chain#204] Batched: true, DataFilters: [isnotnull(noi__close_status#120L), NOT (noi__close_status#120L = 1), ((col_noi__cate1_id#146L = 226)..., Format: ORC, Location: InMemoryFileIndex[viewfs://hadoop-xxx/..., PartitionFilters: [isnotnull(partition_date#203), isnotnull(partition_chain#204), (partition_date#203 = 2024-07-28)..., PushedFilters: [IsNotNull(noi__close_status), Not(EqualTo(noi__close_status,1)), Or(EqualTo(col_noi__cate1_id,226),E..., ReadSchema: struct<noi__id:bigint,col_main_noi__id:bigint,noi__name:string,noi__close_status:bigint,col_noi__cate2_i...
                  :           +- Sort [dp_noi__id#209L ASC NULLS FIRST], false, 0
                  :              +- Exchange hashpartitioning(dp_noi__id#209L, 2000), ENSURE_REQUIREMENTS, [id=#1030]
                  :                 +- HashAggregate(keys=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], functions=[], output=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L])
                  :                    +- Exchange hashpartitioning(aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L, 2000), ENSURE_REQUIREMENTS, [id=#1025]
                  :                       +- HashAggregate(keys=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L], functions=[], output=[aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L])
                  :                          +- Project [aaa_col_user_id#205L, ccc_name#206, ccc_bu_code#207, ccc_bu_name#208, dp_noi__id#209L]
                  :                             +- Filter (ccc_bu_code#207 IN (2,18,28,41,51,56,59,66) AND isnotnull(dp_noi__id#209L))
                  :                                +- FileScan orc tmp_db.table_b[aaa_col_user_id#205L,ccc_name#206,ccc_bu_code#207,ccc_bu_name#208,dp_noi__id#209L,partition_date#221] Batched: true, DataFilters: [ccc_bu_code#207 IN (2,18,28,41,51,56,59,66), isnotnull(dp_noi__id#209L)], Format: ORC, Location: InMemoryFileIndex[viewfs://hadoop-xxx/..., PartitionFilters: [isnotnull(partition_date#221), (partition_date#221 = 2024-07-28)], PushedFilters: [In(ccc_bu_code, [2,18,28,41,51,56,59,66]), IsNotNull(dp_noi__id)], ReadSchema: struct<aaa_col_user_id:bigint,ccc_name:string,ccc_bu_code:string,ccc_bu_name:string,dp...
                  +- Sort [col_user_id#222L ASC NULLS FIRST], false, 0
                     +- HashAggregate(keys=[col_user_id#222L], functions=[sum(col_bal#224)], output=[col_user_id#222L, col_bal#100])
                        +- Exchange hashpartitioning(col_user_id#222L, 2000), ENSURE_REQUIREMENTS, [id=#1035]
                           +- HashAggregate(keys=[col_user_id#222L], functions=[partial_sum(col_bal#224)], output=[col_user_id#222L, sum#425])
                              +- Project [col_user_id#222L, col_bal#224]
                                 +- Filter (pool_type#223L IN (0,6) AND isnotnull(col_user_id#222L))
                                    +- FileScan orc tmp_db.table_a[col_user_id#222L,pool_type#223L,col_bal#224,partition_date#246] Batched: true, DataFilters: [pool_type#223L IN (0,6), isnotnull(col_user_id#222L)], Format: ORC, Location: InMemoryFileIndex[viewfs://hadoop-xxx/_..., PartitionFilters: [isnotnull(partition_date#246), (partition_date#246 = 2024-07-28)], PushedFilters: [In(pool_type, [0,6]), IsNotNull(col_user_id)], ReadSchema: struct<col_user_id:bigint,pool_type:bigint,col_bal:double>
FelixYBW commented 1 month ago

SQL explain.

@PHILO-HE Can you take a look?

zhztheplayer commented 1 month ago

I use Gluten + Spark 3.4 to run a simple distinct doesn't seem to repeat the issue

image

It is probably related to Spark 3.0 / 3.1 which have CustomShuffleReaderExec in code though I am not sure. We should do further investigations.

zhztheplayer commented 1 month ago

I managed to get a more similar case and still not reproduced the issue.

# Generate partitioned data:
tools/gluten-it/sbin/gluten-it.sh data-gen-only --local-cluster --auto-cluster-resource -s=100.0 --gen-partitioned-data
tools/gluten-it/ sbin/gluten-it.sh spark-shell --local-cluster --auto-cluster-resource -s=100.0 --data-gen=skip

# In opened Spark shell, run:
spark sql "set spark.sql.adaptive.coalescePartitions.minPartitionSize=500m" show # force AQEShuffleReadExec
spark sql "set spark.sql.autoBroadcastJoinThreshold=-1" show # disable bhj
val df = spark sql "select * from (select distinct l_orderkey,l_partkey from lineitem) a inner join (select l_orderkey from lineitem limit 10) b on a.l_orderkey = b.l_orderkey limit 10" # run query
df collect # execute
df explain # explain

And the plan explained is fine:

cbe10c4162ef01d4ca4868e387d04ff

In debugger, AQEShuffleReadExec has correct outputPartitioning:

92407d1a561fc31dc960d84eee93028

NEUpanning commented 1 month ago

To reproduce this issue, ensure that the outputPartitioning of AQEShuffleReadExec is UnknownPartitioning. This means that the child (AQEShuffleReadExec) output is NOT partitioned by aggregation keys. Under these conditions, the final aggregation will be transformed into FlushableHashAggregate.