apache / incubator-gluten

Gluten is a middle layer responsible for offloading JVM-based SQL engines' execution to native engines.
https://gluten.apache.org/
Apache License 2.0
1.19k stars 434 forks source link

[CH] Make aggregate output schemas same between spark plan and native plan #7778

Open lgbo-ustc opened 2 hours ago

lgbo-ustc commented 2 hours ago

Description

Some aggregate functions' output is different between spark plan and native plan. For example, avg has two output columns, sum and count, in partial aggregating stage. But native plan has only one, avg.

This will cause some trobule. In #7649, we cannot apply the optimization for this.

lgbo-ustc commented 2 hours ago

Original spark plan

0: jdbc:hive2://localhost:10000> explain select n_regionkey, avg(n_nationkey) from tpch_pq.nation group by n_regionkey;
+----------------------------------------------------+
|                        plan                        |
+----------------------------------------------------+
| == Physical Plan ==
CHNativeColumnarToRow
+- ^(4) HashAggregateTransformer(keys=[n_regionkey#2L], functions=[avg(cast(n_nationkey#0L as double))], isStreamingAgg=false)
   +- ^(4) InputIteratorTransformer[n_regionkey#2L, sum#32, count#33L]
      +- ColumnarExchange hashpartitioning(n_regionkey#2L, 5), ENSURE_REQUIREMENTS, [plan_id=214], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(n_regionkey:LongType, sum:DoubleType, count:LongType)
         +- ^(3) HashAggregateTransformer(keys=[n_regionkey#2L], functions=[partial_avg(_pre_1#34)], isStreamingAgg=false)
            +- ^(3) ProjectExecTransformer [n_nationkey#0L, n_regionkey#2L, cast(n_nationkey#0L as double) AS _pre_1#34]
               +- ^(3) NativeFileScan parquet tpch_pq.nation[n_nationkey#0L,n_regionkey#2L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home//workspace/docker/local_gluten/tpch_pq_data/nat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<n_nationkey:bigint,n_regionkey:bigint>

modified spark plan

0: jdbc:hive2://localhost:10000> explain select n_regionkey, avg(n_nationkey) from tpch_pq.nation group by n_regionkey;
+----------------------------------------------------+
|                        plan                        |
+----------------------------------------------------+
| == Physical Plan ==
CHNativeColumnarToRow
+- ^(15) HashAggregateTransformer(keys=[n_regionkey#7L], functions=[avg(cast(n_nationkey#5L as double))], isStreamingAgg=false)
   +- ^(15) InputIteratorTransformer[n_regionkey#7L, avg(_pre_2#97)#91]
      +- ColumnarExchange hashpartitioning(n_regionkey#7L, 5), ENSURE_REQUIREMENTS, [plan_id=620], [shuffle_writer_type=hash], [OUTPUT] Vector(n_regionkey:LongType, avg(_pre_2#97):DoubleType)
         +- ^(14) HashAggregateTransformer(keys=[n_regionkey#7L], functions=[partial_avg(_pre_2#97)], isStreamingAgg=false)
            +- ^(14) ProjectExecTransformer [n_nationkey#5L, n_regionkey#7L, cast(n_nationkey#5L as double) AS _pre_2#97]
               +- ^(14) NativeFileScan parquet tpch_pq.nation[n_nationkey#5L,n_regionkey#7L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home//workspace/docker/local_gluten/tpch_pq_data/nat..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<n_nationkey:bigint,n_regionkey:bigint>

 |
+----------------------------------------------------+