apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.15k stars 418 forks source link

[CH] Result diff when set `streaming_aggregate_enabled` true #7311

Open KevinyhZou opened 4 days ago

KevinyhZou commented 4 days ago

Backend

CH (ClickHouse)

Bug description

The diff problem is found at case dsgluten_2413_0_1

When set the config spark.gluten.sql.columnar.backend.ch.runtime_config.enable_streaming_aggregating false, the result is same as valina , as below

2024-08-26,0,AD,208, all, all, 29,22, 1.3181818,,,,

when set this config as true, the result is different from valina

2024-08-26,0,AD,208,all,all, 29, 23, 1.260869 ,,,, 

Spark version

Spark-3.3.x

Spark configurations

No response

System information

No response

Relevant logs

No response

KevinyhZou commented 4 days ago

@lgbo-ustc

lgbo-ustc commented 4 days ago

This a problem related to count(distinct). At fist, we give the physical plan for the sql as below.

+- CHNativeColumnarToRow
   +- ^(4) ProjectExecTransformer [2024-08-26 AS sday#8, cast(hour#43 as bigint) AS hour#77L, coalesce(country#44, all) AS country#2, coalesce(cast(msg_type#45L as string), all) AS msg_type#3, coalesce(cast(content_type#46L as string), all) AS content_type#4, coalesce(is_visitor#47, all) AS is_visitor#5, sum(user_push_msg_send_count#1L)#35L AS push_send_count#6L, count(uid#21L)#36L AS push_send_uv#7L, (cast(sum(user_push_msg_send_count#1L)#35L as double) / cast(count(uid#21L)#36L as double)) AS avg_push_send_count#9]
      +- ^(4) HashAggregateTransformer(keys=[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L], functions=[sum(user_push_msg_send_count#1L), count(distinct uid#21L)], isStreamingAgg=false)
         +- ^(4) InputIteratorTransformer[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L, sum#80L, count#83L]
            +- ColumnarExchange hashpartitioning(hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L, 500), ENSURE_REQUIREMENTS, [plan_id=271], [shuffle_writer_type=hash], [OUTPUT] List(hour:StringType, country:StringType, msg_type:LongType, content_type:LongType, is_visitor:StringType, spark_grouping_id:LongType, sum:LongType, count:LongType)
               +- ^(3) HashAggregateTransformer(keys=[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L], functions=[merge_sum(user_push_msg_send_count#1L), partial_count(distinct uid#21L)], isStreamingAgg=false)
                  +- ^(3) HashAggregateTransformer(keys=[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L, uid#21L], functions=[merge_sum(user_push_msg_send_count#1L)], isStreamingAgg=false)
                     +- ^(3) InputIteratorTransformer[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L, uid#21L, sum#80L]
                        +- ColumnarExchange hashpartitioning(hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L, uid#21L, 500), ENSURE_REQUIREMENTS, [plan_id=265], [shuffle_writer_type=hash], [OUTPUT] List(hour:StringType, country:StringType, msg_type:LongType, content_type:LongType, is_visitor:StringType, spark_grouping_id:LongType, uid:LongType, sum:LongType)
                           +- ^(2) HashAggregateTransformer(keys=[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L, uid#21L], functions=[partial_sum(user_push_msg_send_count#1L)], isStreamingAgg=false)
                              +- ^(2) FilterExecTransformer ((coalesce(country#44, all) = all) OR RLIKE(coalesce(country#44, all), ^[A-Z]{2}$))
                                 +- ^(2) ExpandExecTransformer [[uid#21L, user_push_msg_send_count#1L, hour#33, null, null, null, null, 15], [uid#21L, user_push_msg_send_count#1L, hour#33, country#22, null, null, null, 7], [uid#21L, user_push_msg_send_count#1L, hour#33, null, msg_type#19L, null, null, 11], [uid#21L, user_push_msg_send_count#1L, hour#33, country#22, msg_type#19L, null, null, 3], [uid#21L, user_push_msg_send_count#1L, hour#33, null, null, content_type#20L, null, 13], [uid#21L, user_push_msg_send_count#1L, hour#33, country#22, null, content_type#20L, null, 5], [uid#21L, user_push_msg_send_count#1L, hour#33, null, msg_type#19L, content_type#20L, null, 9], [uid#21L, user_push_msg_send_count#1L, hour#33, country#22, msg_type#19L, content_type#20L, null, 1], [uid#21L, user_push_msg_send_count#1L, hour#33, null, null, null, is_visitor#41, 14], [uid#21L, user_push_msg_send_count#1L, hour#33, country#22, null, null, is_visitor#41, 6], [uid#21L, user_push_msg_send_count#1L, hour#33, null, msg_type#19L, null, is_visitor#41, 10], [uid#21L, user_push_msg_send_count#1L, hour#33, country#22, msg_type#19L, null, is_visitor#41, 2], [uid#21L, user_push_msg_send_count#1L, hour#33, null, null, content_type#20L, is_visitor#41, 12], [uid#21L, user_push_msg_send_count#1L, hour#33, country#22, null, content_type#20L, is_visitor#41, 4], [uid#21L, user_push_msg_send_count#1L, hour#33, null, msg_type#19L, content_type#20L, is_visitor#41, 8], [uid#21L, user_push_msg_send_count#1L, hour#33, country#22, msg_type#19L, content_type#20L, is_visitor#41, 0]], [uid#21L, user_push_msg_send_count#1L, hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L]
                                    +- ^(2) ProjectExecTransformer [uid#21L, sum(user_push_msg_send_count_01#30)#34L AS user_push_msg_send_count#1L, hour#33, country#22, msg_type#19L, content_type#20L, _groupingexpression#78 AS is_visitor#41]
                                       +- ^(2) HashAggregateTransformer(keys=[day#32, uid#21L, hour#33, country#22, msg_type#19L, content_type#20L, _groupingexpression#78], functions=[sum(user_push_msg_send_count_01#30)], isStreamingAgg=false)
                                          +- ^(2) InputIteratorTransformer[day#32, uid#21L, hour#33, country#22, msg_type#19L, content_type#20L, _groupingexpression#78, sum#86L]
                                             +- ColumnarExchange hashpartitioning(day#32, uid#21L, hour#33, country#22, msg_type#19L, content_type#20L, _groupingexpression#78, 500), ENSURE_REQUIREMENTS, [plan_id=256], [shuffle_writer_type=hash], [OUTPUT] Vector(day:StringType, uid:LongType, hour:StringType, country:StringType, msg_type:LongType, content_type:LongType, _groupingexpression:StringType, sum:LongType)
                                                +- ^(1) HashAggregateTransformer(keys=[day#32, uid#21L, hour#33, country#22, msg_type#19L, content_type#20L, _groupingexpression#78], functions=[partial_sum(user_push_msg_send_count_01#30)], isStreamingAgg=false)
                                                   +- ^(1) ProjectExecTransformer [msg_type#19L, content_type#20L, uid#21L, country#22, user_push_msg_send_count_01#30, day#32, hour#33, if ((is_visitor#28 = 1)) 访客 else 注册用户 AS _groupingexpression#78]
                                                      +- ^(1) FilterExecTransformer (isnotnull(extratype#31L) AND (extratype#31L = 0))
                                                         +- ^(1) NativeFileScan

How does spark implete count(distinct) ?

We give a simple sql as following

select n_regionkey, count(distinct(n_nationkey)) from tpch_pq.nation group by n_regionkey;

Its physical plan is

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[n_regionkey#2L], functions=[count(distinct n_nationkey#0L)])
   +- Exchange hashpartitioning(n_regionkey#2L, 5), ENSURE_REQUIREMENTS, [plan_id=1405]
      +- HashAggregate(keys=[n_regionkey#2L], functions=[partial_count(distinct n_nationkey#0L)])
         +- HashAggregate(keys=[n_regionkey#2L, n_nationkey#0L], functions=[])
            +- Exchange hashpartitioning(n_regionkey#2L, n_nationkey#0L, 5), ENSURE_REQUIREMENTS, [plan_id=1401]
               +- HashAggregate(keys=[n_regionkey#2L, n_nationkey#0L], functions=[])
                  +- FileScan parquet tpch_pq.nation[n_nationkey#0L,n_regionkey#2L]

It has two aggregation steps. The 1st step uses n_nationkey as one grouping key, this makes sure that n_nationkey is unique under each n_regionkey. The 2nd step apply count(uid) on each n_regionkey. Without using CountDistinct, these to steps could achieve the same result.

How does the problem come?

This this case, spark also use the above algorithm to achieve count(distinct(uid)). Let's see the two last aggregate operations.

     +- ^(4) HashAggregateTransformer(keys=[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L], functions=[sum(user_push_msg_send_count#1L), count(distinct uid#21L)], isStreamingAgg=false)
         +- ^(4) InputIteratorTransformer[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L, sum#80L, count#83L]
            +- ColumnarExchange hashpartitioning(hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L, 500), ENSURE_REQUIREMENTS, [plan_id=271], [shuffle_writer_type=hash], [OUTPUT] List(hour:StringType, country:StringType, msg_type:LongType, content_type:LongType, is_visitor:StringType, spark_grouping_id:LongType, sum:LongType, count:LongType)
               +- ^(3) HashAggregateTransformer(keys=[hour#43, country#44, msg_type#45L, content_type#46L, is_visitor#47, spark_grouping_id#42L], functions=[merge_sum(user_push_msg_send_count#1L), partial_count(distinct uid#21L)], isStreamingAgg=false)

When we enable streaming aggregating, the 1st aggregate operation could generate multi records for the same grouping keys, and then the 2nd aggregate operation just accumulate the result of count(distinct). This could result in wrong result.

In vanilla or we disable streaming aggregating, the 1st aggregate operation generates one record for each group.

lgbo-ustc commented 4 days ago

Disabling streaming aggregating cannot guarantee this problem will not occur. If the memory usage is over limit, it will spill current result to downstream, still could generate multi records for the same group in the partial aggregating stages.

lgbo-ustc commented 4 days ago

We need to introduce grace hash aggregate for partial aggregating stage.

  1. Make GraceMergingAggregatedTransform support generating intermedia aggregating result. This modification should not be too large
  2. Recognize the pattern that it's a aggregation involves distinct. e.g. sum(distinct(x)), count(distinct). In these cases, use GraceMergingAggregatedTransform instead of StreamingAggregatingTransform for the partial aggregating stages.
lgbo-ustc commented 3 days ago

Some example queries

select n_regionkey, count(distinct(n_name)), sum(distinct(n_nationkey)) from tpch_pq.nation group by n_regionkey with cube;
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[n_regionkey#45L, spark_grouping_id#44L], functions=[count(spark_catalog.tpch_pq.nation.n_name#47), sum(spark_catalog.tpch_pq.nation.n_nationkey#48L)])
   +- Exchange hashpartitioning(n_regionkey#45L, spark_grouping_id#44L, 5), ENSURE_REQUIREMENTS, [plan_id=273]
      +- HashAggregate(keys=[n_regionkey#45L, spark_grouping_id#44L], functions=[partial_count(spark_catalog.tpch_pq.nation.n_name#47) FILTER (WHERE (gid#46 = 1)), partial_sum(spark_catalog.tpch_pq.nation.n_nationkey#48L) FILTER (WHERE (gid#46 = 2))])
         +- HashAggregate(keys=[n_regionkey#45L, spark_grouping_id#44L, spark_catalog.tpch_pq.nation.n_name#47, spark_catalog.tpch_pq.nation.n_nationkey#48L, gid#46], functions=[])
            +- Exchange hashpartitioning(n_regionkey#45L, spark_grouping_id#44L, spark_catalog.tpch_pq.nation.n_name#47, spark_catalog.tpch_pq.nation.n_nationkey#48L, gid#46, 5), ENSURE_REQUIREMENTS, [plan_id=269]
               +- HashAggregate(keys=[n_regionkey#45L, spark_grouping_id#44L, spark_catalog.tpch_pq.nation.n_name#47, spark_catalog.tpch_pq.nation.n_nationkey#48L, gid#46], functions=[])
                  +- Expand [[n_regionkey#45L, spark_grouping_id#44L, n_name#1, null, 1], [n_regionkey#45L, spark_grouping_id#44L, null, n_nationkey#0L, 2]], [n_regionkey#45L, spark_grouping_id#44L, spark_catalog.tpch_pq.nation.n_name#47, spark_catalog.tpch_pq.nation.n_nationkey#48L, gid#46]
                     +- Expand [[n_nationkey#0L, n_name#1, n_regionkey#2L, 0], [n_nationkey#0L, n_name#1, null, 1]], [n_nationkey#0L, n_name#1, n_regionkey#45L, spark_grouping_id#44L]
                        +- FileScan parquet tpch_pq.nation[n_nationkey#0L,n_name#1,n_regionkey#2L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/workspace/docker/local_gluten/tpch_pq_data/nat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<n_nationkey:bigint,n_name:string,n_regionkey:bigint>
select n_regionkey, count(distinct(n_name)), sum(distinct(n_nationkey)) from tpch_pq.nation group by n_regionkey
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[n_regionkey#2L], functions=[count(spark_catalog.tpch_pq.nation.n_name#65), sum(spark_catalog.tpch_pq.nation.n_nationkey#66L)])
   +- Exchange hashpartitioning(n_regionkey#2L, 5), ENSURE_REQUIREMENTS, [plan_id=310]
      +- HashAggregate(keys=[n_regionkey#2L], functions=[partial_count(spark_catalog.tpch_pq.nation.n_name#65) FILTER (WHERE (gid#64 = 1)), partial_sum(spark_catalog.tpch_pq.nation.n_nationkey#66L) FILTER (WHERE (gid#64 = 2))])
         +- HashAggregate(keys=[n_regionkey#2L, spark_catalog.tpch_pq.nation.n_name#65, spark_catalog.tpch_pq.nation.n_nationkey#66L, gid#64], functions=[])
            +- Exchange hashpartitioning(n_regionkey#2L, spark_catalog.tpch_pq.nation.n_name#65, spark_catalog.tpch_pq.nation.n_nationkey#66L, gid#64, 5), ENSURE_REQUIREMENTS, [plan_id=306]
               +- HashAggregate(keys=[n_regionkey#2L, spark_catalog.tpch_pq.nation.n_name#65, spark_catalog.tpch_pq.nation.n_nationkey#66L, gid#64], functions=[])
                  +- Expand [[n_regionkey#2L, n_name#1, null, 1], [n_regionkey#2L, null, n_nationkey#0L, 2]], [n_regionkey#2L, spark_catalog.tpch_pq.nation.n_name#65, spark_catalog.tpch_pq.nation.n_nationkey#66L, gid#64]
                     +- FileScan parquet tpch_pq.nation[n_nationkey#0L,n_name#1,n_regionkey#2L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/workspace/docker/local_gluten/tpch_pq_data/nat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<n_nationkey:bigint,n_name:string,n_regionkey:bigint>
select n_regionkey, count(distinct(n_name))  from tpch_pq.nation group by n_regionkey
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[n_regionkey#2L], functions=[count(distinct n_name#1)])
   +- Exchange hashpartitioning(n_regionkey#2L, 5), ENSURE_REQUIREMENTS, [plan_id=342]
      +- HashAggregate(keys=[n_regionkey#2L], functions=[partial_count(distinct n_name#1)])
         +- HashAggregate(keys=[n_regionkey#2L, n_name#1], functions=[])
            +- Exchange hashpartitioning(n_regionkey#2L, n_name#1, 5), ENSURE_REQUIREMENTS, [plan_id=338]
               +- HashAggregate(keys=[n_regionkey#2L, n_name#1], functions=[])
                  +- FileScan parquet tpch_pq.nation[n_name#1,n_regionkey#2L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/workspace/docker/local_gluten/tpch_pq_data/nat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<n_name:string,n_regionkey:bigint>