apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.14k stars 416 forks source link

[CH] Support `WindowGroupLimit` for `row_number`, `rank` and `dense_rank` #7087

Closed lgbo-ustc closed 4 days ago

lgbo-ustc commented 1 week ago

Description

Part of #6067.

For ds q44, the phyiscal plan is

CHNativeColumnarToRow
+- TakeOrderedAndProjectExecTransformer (limit=100, orderBy=[rnk#74 ASC NULLS FIRST], output=[rnk#74,best_performing#80,worst_performing#81])
   +- ^(16) ProjectExecTransformer [rnk#74, i_product_name#135 AS best_performing#80, i_product_name#180 AS worst_performing#81]
      +- ^(16) CHSortMergeJoinExecTransformer [item_sk#75L], [i_item_sk#159L], Inner, false
         :- ^(16) SortExecTransformer [item_sk#75L ASC NULLS FIRST], false, 0
         :  +- ^(16) InputIteratorTransformer[rnk#74, item_sk#75L, i_product_name#135]
         :     +- ColumnarExchange hashpartitioning(item_sk#75L, 5), ENSURE_REQUIREMENTS, [plan_id=1465], [shuffle_writer_type=hash], [OUTPUT] List(rnk:IntegerType, item_sk:LongType, i_product_name:StringType)
         :        +- ^(14) ProjectExecTransformer [rnk#74, item_sk#75L, i_product_name#135]
         :           +- ^(14) CHSortMergeJoinExecTransformer [item_sk#70L], [i_item_sk#114L], Inner, false
         :              :- ^(14) SortExecTransformer [item_sk#70L ASC NULLS FIRST], false, 0
         :              :  +- ^(14) InputIteratorTransformer[item_sk#70L, rnk#74, item_sk#75L]
         :              :     +- ColumnarExchange hashpartitioning(item_sk#70L, 5), ENSURE_REQUIREMENTS, [plan_id=1407], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rnk:IntegerType, item_sk:LongType)
         :              :        +- ^(12) ProjectExecTransformer [item_sk#70L, rnk#74, item_sk#75L]
         :              :           +- ^(12) CHSortMergeJoinExecTransformer [rnk#74], [rnk#79], Inner, false
         :              :              :- ^(12) SortExecTransformer [rnk#74 ASC NULLS FIRST], false, 0
         :              :              :  +- ^(12) ProjectExecTransformer [item_sk#70L, rnk#74]
         :              :              :     +- ^(12) FilterExecTransformer ((rnk#74 < 11) AND isnotnull(item_sk#70L))
         :              :              :        +- ^(12) WindowExecTransformer [rank(rank_col#71) windowspecdefinition(rank_col#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#74], [rank_col#71 ASC NULLS FIRST]
         :              :              :           +- ^(12) InputIteratorTransformer[item_sk#70L, rank_col#71]
         :              :              :              +- RowToCHNativeColumnar
         :              :              :                 +- WindowGroupLimit [rank_col#71 ASC NULLS FIRST], rank(rank_col#71), 10, Final
         :              :              :                    +- CHNativeColumnarToRow
         :              :              :                       +- ^(8) SortExecTransformer [rank_col#71 ASC NULLS FIRST], false, 0
         :              :              :                          +- ^(8) InputIteratorTransformer[item_sk#70L, rank_col#71]
         :              :              :                             +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1210], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :              :              :                                +- RowToCHNativeColumnar
         :              :              :                                   +- WindowGroupLimit [rank_col#71 ASC NULLS FIRST], rank(rank_col#71), 10, Partial
         :              :              :                                      +- CHNativeColumnarToRow
         :              :              :                                         +- ^(7) SortExecTransformer [rank_col#71 ASC NULLS FIRST], false, 0
         :              :              :                                            +- ^(7) FilterExecTransformer (isnotnull(rank_col#71) AND (cast(rank_col#71 as decimal(13,7)) > (0.9 * Subquery scalar-subquery#73, [id=#294])))
         :              :              :                                               :  +- Subquery scalar-subquery#73, [id=#294]
         :              :              :                                               :     +- CHNativeColumnarToRow
         :              :              :                                               :        +- ^(3) ProjectExecTransformer [cast((avg(UnscaledValue(ss_net_profit#209))#185 / 100.0) as decimal(11,6)) AS rank_col#72]
         :              :              :                                               :           +- ^(3) HashAggregateTransformer(keys=[ss_store_sk#195L], functions=[avg(UnscaledValue(ss_net_profit#209))], isStreamingAgg=false)
         :              :              :                                               :              +- ^(3) InputIteratorTransformer[ss_store_sk#195L, sum#265, count#266L]
         :              :              :                                               :                 +- ColumnarExchange hashpartitioning(ss_store_sk#195L, 5), ENSURE_REQUIREMENTS, [plan_id=287], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_store_sk:LongType, sum:DoubleType, count:LongType)
         :              :              :                                               :                    +- ^(2) HashAggregateTransformer(keys=[ss_store_sk#195L], functions=[partial_avg(_pre_0#267L)], isStreamingAgg=false)
         :              :              :                                               :                       +- ^(2) ProjectExecTransformer [ss_store_sk#195L, ss_net_profit#209, UnscaledValue(ss_net_profit#209) AS _pre_0#267L]
         :              :              :                                               :                          +- ^(2) FilterExecTransformer ((isnotnull(ss_store_sk#195L) AND (ss_store_sk#195L = cast(2 as bigint))) AND isnull(ss_hdemo_sk#193L))
         :              :              :                                               :                             +- ^(2) NativeFileScan parquet spark_catalog.tpcds_pq100.store_sales[ss_hdemo_sk#193L,ss_store_sk#195L,ss_net_profit#209] Batched: true, DataFilters: [isnotnull(ss_store_sk#195L), isnull(ss_hdemo_sk#193L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), IsNull(ss_hdemo_sk)], ReadSchema: struct<ss_hdemo_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :              :              :                                               +- ^(7) ProjectExecTransformer [ss_item_sk#91L AS item_sk#70L, cast((avg(UnscaledValue(ss_net_profit#113))#183 / 100.0) as decimal(11,6)) AS rank_col#71]
         :              :              :                                                  +- ^(7) HashAggregateTransformer(keys=[ss_item_sk#91L], functions=[avg(UnscaledValue(ss_net_profit#113))], isStreamingAgg=false)
         :              :              :                                                     +- ^(7) InputIteratorTransformer[ss_item_sk#91L, sum#257, count#258L]
         :              :              :                                                        +- ColumnarExchange hashpartitioning(ss_item_sk#91L, 5), ENSURE_REQUIREMENTS, [plan_id=1199], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
         :              :              :                                                           +- ^(6) HashAggregateTransformer(keys=[ss_item_sk#91L], functions=[partial_avg(_pre_2#277L)], isStreamingAgg=false)
         :              :              :                                                              +- ^(6) ProjectExecTransformer [ss_item_sk#91L, ss_net_profit#113, UnscaledValue(ss_net_profit#113) AS _pre_2#277L]
         :              :              :                                                                 +- ^(6) FilterExecTransformer (isnotnull(ss_store_sk#99L) AND (ss_store_sk#99L = cast(2 as bigint)))
         :              :              :                                                                    +- ^(6) NativeFileScan parquet spark_catalog.tpcds_pq100.store_sales[ss_item_sk#91L,ss_store_sk#99L,ss_net_profit#113] Batched: true, DataFilters: [isnotnull(ss_store_sk#99L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk)], ReadSchema: struct<ss_item_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :              :              +- ^(12) SortExecTransformer [rnk#79 ASC NULLS FIRST], false, 0
         :              :                 +- ^(12) ProjectExecTransformer [item_sk#75L, rnk#79]
         :              :                    +- ^(12) FilterExecTransformer ((rnk#79 < 11) AND isnotnull(item_sk#75L))
         :              :                       +- ^(12) WindowExecTransformer [rank(rank_col#76) windowspecdefinition(rank_col#76 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#79], [rank_col#76 DESC NULLS LAST]
         :              :                          +- ^(12) InputIteratorTransformer[item_sk#75L, rank_col#76]
         :              :                             +- RowToCHNativeColumnar
         :              :                                +- WindowGroupLimit [rank_col#76 DESC NULLS LAST], rank(rank_col#76), 10, Final
         :              :                                   +- CHNativeColumnarToRow
         :              :                                      +- ^(11) SortExecTransformer [rank_col#76 DESC NULLS LAST], false, 0
         :              :                                         +- ^(11) InputIteratorTransformer[item_sk#75L, rank_col#76]
         :              :                                            +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1374], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :              :                                               +- RowToCHNativeColumnar
         :              :                                                  +- WindowGroupLimit [rank_col#76 DESC NULLS LAST], rank(rank_col#76), 10, Partial
         :              :                                                     +- CHNativeColumnarToRow
         :              :                                                        +- ^(10) SortExecTransformer [rank_col#76 DESC NULLS LAST], false, 0
         :              :                                                           +- ^(10) FilterExecTransformer (isnotnull(rank_col#76) AND (cast(rank_col#76 as decimal(13,7)) > (0.9 * ReusedSubquery Subquery scalar-subquery#73, [id=#294])))
         :              :                                                              :  +- ReusedSubquery Subquery scalar-subquery#73, [id=#294]
         :              :                                                              +- ^(10) ProjectExecTransformer [ss_item_sk#136L AS item_sk#75L, cast((avg(UnscaledValue(ss_net_profit#158))#184 / 100.0) as decimal(11,6)) AS rank_col#76]
         :              :                                                                 +- ^(10) HashAggregateTransformer(keys=[ss_item_sk#136L], functions=[avg(UnscaledValue(ss_net_profit#158))], isStreamingAgg=false)
         :              :                                                                    +- ^(10) InputIteratorTransformer[ss_item_sk#136L, sum#261, count#262L]
         :              :                                                                       +- ReusedExchange [ss_item_sk#136L, sum#261, count#262L], ColumnarExchange hashpartitioning(ss_item_sk#91L, 5), ENSURE_REQUIREMENTS, [plan_id=1199], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
         :              +- ^(14) SortExecTransformer [i_item_sk#114L ASC NULLS FIRST], false, 0
         :                 +- ^(14) InputIteratorTransformer[i_item_sk#114L, i_product_name#135]
         :                    +- ColumnarExchange hashpartitioning(i_item_sk#114L, 5), ENSURE_REQUIREMENTS, [plan_id=1258], [shuffle_writer_type=hash], [OUTPUT] List(i_item_sk:LongType, i_product_name:StringType)
         :                       +- ^(13) FilterExecTransformer isnotnull(i_item_sk#114L)
         :                          +- ^(13) NativeFileScan parquet spark_catalog.tpcds_pq100.item[i_item_sk#114L,i_product_name#135] Batched: true, DataFilters: [isnotnull(i_item_sk#114L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_product_name:string>
         +- ^(16) SortExecTransformer [i_item_sk#159L ASC NULLS FIRST], false, 0
            +- ^(16) InputIteratorTransformer[i_item_sk#159L, i_product_name#180]
               +- ReusedExchange [i_item_sk#159L, i_product_name#180], ColumnarExchange hashpartitioning(i_item_sk#114L, 5), ENSURE_REQUIREMENTS, [plan_id=1258], [shuffle_writer_type=hash], [OUTPUT] List(i_item_sk:LongType, i_product_name:StringType)

q67 and q70 alse fallback on WindowGroupLimit

lgbo-ustc commented 1 week ago

The physical plan on spark-3.3 is

CHNativeColumnarToRow
+- TakeOrderedAndProjectExecTransformer (limit=100, orderBy=[rnk#5 ASC NULLS FIRST], output=[rnk#5,best_performing#11,worst_performing#12])
   +- ^(11) ProjectExecTransformer [rnk#5, i_product_name#66 AS best_performing#11, i_product_name#111 AS worst_performing#12]
      +- ^(11) CHBroadcastHashJoinExecTransformer [item_sk#6L], [i_item_sk#90L], Inner, BuildRight, false
         :- ^(11) ProjectExecTransformer [rnk#5, item_sk#6L, i_product_name#66]
         :  +- ^(11) CHBroadcastHashJoinExecTransformer [item_sk#1L], [i_item_sk#45L], Inner, BuildRight, false
         :     :- ^(11) ProjectExecTransformer [item_sk#1L, rnk#5, item_sk#6L]
         :     :  +- ^(11) CHSortMergeJoinExecTransformer [rnk#5], [rnk#10], Inner, false
         :     :     :- ^(11) SortExecTransformer [rnk#5 ASC NULLS FIRST], false, 0
         :     :     :  +- ^(11) ProjectExecTransformer [item_sk#1L, rnk#5]
         :     :     :     +- ^(11) FilterExecTransformer ((rnk#5 < 11) AND isnotnull(item_sk#1L))
         :     :     :        +- ^(11) WindowExecTransformer [rank(rank_col#2) windowspecdefinition(rank_col#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#5], [rank_col#2 ASC NULLS FIRST]
         :     :     :           +- ^(11) SortExecTransformer [rank_col#2 ASC NULLS FIRST], false, 0
         :     :     :              +- ^(11) InputIteratorTransformer[item_sk#1L, rank_col#2]
         :     :     :                 +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=896], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :     :     :                    +- ^(6) FilterExecTransformer (isnotnull(rank_col#2) AND (cast(rank_col#2 as decimal(13,7)) > CheckOverflow((promote_precision(cast(0.9 as decimal(11,6))) * promote_precision(Subquery scalar-subquery#4, [id=#222])), DecimalType(13,7))))
         :     :     :                       :  +- Subquery scalar-subquery#4, [id=#222]
         :     :     :                       :     +- CHNativeColumnarToRow
         :     :     :                       :        +- ^(2) ProjectExecTransformer [cast((avg(UnscaledValue(ss_net_profit#142))#116 / 100.0) as decimal(11,6)) AS rank_col#3]
         :     :     :                       :           +- ^(2) HashAggregateTransformer(keys=[ss_store_sk#128L], functions=[avg(UnscaledValue(ss_net_profit#142))], isStreamingAgg=false)
         :     :     :                       :              +- ^(2) InputIteratorTransformer[ss_store_sk#128L, sum#204, count#205L]
         :     :     :                       :                 +- ColumnarExchange hashpartitioning(ss_store_sk#128L, 5), ENSURE_REQUIREMENTS, [plan_id=215], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_store_sk:LongType, sum:DoubleType, count:LongType)
         :     :     :                       :                    +- ^(1) HashAggregateTransformer(keys=[ss_store_sk#128L], functions=[partial_avg(_pre_0#206L)], isStreamingAgg=false)
         :     :     :                       :                       +- ^(1) ProjectExecTransformer [ss_store_sk#128L, ss_net_profit#142, UnscaledValue(ss_net_profit#142) AS _pre_0#206L]
         :     :     :                       :                          +- ^(1) FilterExecTransformer ((isnotnull(ss_store_sk#128L) AND (ss_store_sk#128L = cast(2 as bigint))) AND isnull(ss_hdemo_sk#126L))
         :     :     :                       :                             +- ^(1) NativeFileScan parquet tpcds_pq100.store_sales[ss_hdemo_sk#126L,ss_store_sk#128L,ss_net_profit#142] Batched: true, DataFilters: [isnotnull(ss_store_sk#128L), isnull(ss_hdemo_sk#126L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), IsNull(ss_hdemo_sk)], ReadSchema: struct<ss_hdemo_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :     :     :                       +- ^(6) ProjectExecTransformer [ss_item_sk#22L AS item_sk#1L, cast((avg(UnscaledValue(ss_net_profit#44))#114 / 100.0) as decimal(11,6)) AS rank_col#2]
         :     :     :                          +- ^(6) HashAggregateTransformer(keys=[ss_item_sk#22L], functions=[avg(UnscaledValue(ss_net_profit#44))], isStreamingAgg=false)
         :     :     :                             +- ^(6) InputIteratorTransformer[ss_item_sk#22L, sum#196, count#197L]
         :     :     :                                +- ColumnarExchange hashpartitioning(ss_item_sk#22L, 5), ENSURE_REQUIREMENTS, [plan_id=889], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
         :     :     :                                   +- ^(5) HashAggregateTransformer(keys=[ss_item_sk#22L], functions=[partial_avg(_pre_2#216L)], isStreamingAgg=false)
         :     :     :                                      +- ^(5) ProjectExecTransformer [ss_item_sk#22L, ss_net_profit#44, UnscaledValue(ss_net_profit#44) AS _pre_2#216L]
         :     :     :                                         +- ^(5) FilterExecTransformer (isnotnull(ss_store_sk#30L) AND (ss_store_sk#30L = cast(2 as bigint)))
         :     :     :                                            +- ^(5) NativeFileScan parquet tpcds_pq100.store_sales[ss_item_sk#22L,ss_store_sk#30L,ss_net_profit#44] Batched: true, DataFilters: [isnotnull(ss_store_sk#30L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk)], ReadSchema: struct<ss_item_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
         :     :     +- ^(11) SortExecTransformer [rnk#10 ASC NULLS FIRST], false, 0
         :     :        +- ^(11) ProjectExecTransformer [item_sk#6L, rnk#10]
         :     :           +- ^(11) FilterExecTransformer ((rnk#10 < 11) AND isnotnull(item_sk#6L))
         :     :              +- ^(11) WindowExecTransformer [rank(rank_col#7) windowspecdefinition(rank_col#7 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#10], [rank_col#7 DESC NULLS LAST]
         :     :                 +- ^(11) SortExecTransformer [rank_col#7 DESC NULLS LAST], false, 0
         :     :                    +- ^(11) InputIteratorTransformer[item_sk#6L, rank_col#7]
         :     :                       +- ReusedExchange [item_sk#6L, rank_col#7], ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=896], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
         :     +- ^(11) InputIteratorTransformer[i_item_sk#45L, i_product_name#66]
         :        +- ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=923]
         :           +- ^(9) FilterExecTransformer isnotnull(i_item_sk#45L)
         :              +- ^(9) NativeFileScan parquet tpcds_pq100.item[i_item_sk#45L,i_product_name#66] Batched: true, DataFilters: [isnotnull(i_item_sk#45L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.3.2-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_product_name:string>
         +- ^(11) InputIteratorTransformer[i_item_sk#90L, i_product_name#111]
            +- ReusedExchange [i_item_sk#90L, i_product_name#111], ColumnarBroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=923]
lgbo-ustc commented 1 week ago

Two advantages of WindowGroupLimit

  1. avoid OOM
  2. more less data to be shuffled and processed on the later stages
lgbo-ustc commented 1 week ago

Base on the WindowTransform in CH, we need to

  1. Adjust the window frame begin bound to a near offset, instead of unbounded preceding, similar to #6214. This will reduce a lot of required memory.
  2. How to drop following blocks directly when the rank value is larger then the limit. This will reduce the overhead of value insertion and later filter.
lgbo-ustc commented 1 week ago

Cannot use WindowTransform to implement this, have to make a new processor, it should be simpler then WindowTransform