apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.87k stars 1.11k forks source link

Improve `SingleDistinctToGroupBy` to get the same plan as the `group by` query #11360

Open jayzhan211 opened 2 months ago

jayzhan211 commented 2 months ago

Is your feature request related to a problem or challenge?

While working on #11299 , I meet the issue that the single distinct plan is different from group by plan. https://github.com/apache/datafusion/pull/11299/files#r1667248774 I solve the issue by handling different values I got in update_batach. But, I think this is not the root cause of the problem.

SingleDistinctToGroupBy is converting distinct to group by expression. Ideally the optimized plan should be the same as the group by version, but the following plan is not what I expect.

statement ok
create table t(a int) as values (1);

query TT
explain select array_agg(distinct a) from t where a > 3;
----
logical_plan
01)Projection: ARRAY_AGG(alias1) AS ARRAY_AGG(DISTINCT t.a)
02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(alias1)]]
03)----Aggregate: groupBy=[[t.a AS alias1]], aggr=[[]]
04)------Filter: t.a > Int32(3)
05)--------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(alias1)@0 as ARRAY_AGG(DISTINCT t.a)]
02)--AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(alias1)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(alias1)]
05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=4
08)--------------AggregateExec: mode=Partial, gby=[a@0 as alias1], aggr=[]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------FilterExec: a@0 > 3
12)----------------------MemoryExec: partitions=1, partition_sizes=[1]

query TT
explain select array_agg(a) from t where a > 3 group by a;
----
logical_plan
01)Projection: ARRAY_AGG(t.a)
02)--Aggregate: groupBy=[[t.a]], aggr=[[ARRAY_AGG(t.a)]]
03)----Filter: t.a > Int32(3)
04)------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[ARRAY_AGG(t.a)@1 as ARRAY_AGG(t.a)]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[ARRAY_AGG(t.a)]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: a@0 > 3
09)----------------MemoryExec: partitions=1, partition_sizes=[1]

Describe the solution you'd like

Rewrite SingleDistinctToGroupBy so the optimized plan should be the same like the group by version.

02)--Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(alias1)]] // outer
03)----Aggregate: groupBy=[[t.a AS alias1]], aggr=[[]] // inner

I think it is possible to have just one Aggregate if outer group by expr is empty and inner aggregate expr is empty

Describe alternatives you've considered

Do nothing but add the docs about the reason of why we can't

Additional context

No response

jayzhan211 commented 1 month ago

For query like, select bit_xor(distinct a % 2) from t, it is converted to group by expression in datafusion, but not in DuckDB.

datafusion test ``` // datafusion statement ok create table t(a int) as values (1), (1), (2), (2), (2), (3); query I select bit_xor(distinct a % 2) from t; ---- 1 query TT explain select bit_xor(distinct a % 2) from t; ---- logical_plan 01)Projection: bit_xor(alias1) AS bit_xor(DISTINCT t.a % Int64(2)) 02)--Aggregate: groupBy=[[]], aggr=[[bit_xor(alias1)]] 03)----Aggregate: groupBy=[[CAST(t.a AS Int64) % Int64(2) AS alias1]], aggr=[[]] 04)------TableScan: t projection=[a] physical_plan 01)ProjectionExec: expr=[bit_xor(alias1)@0 as bit_xor(DISTINCT t.a % Int64(2))] 02)--AggregateExec: mode=Final, gby=[], aggr=[bit_xor(alias1)] 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[], aggr=[bit_xor(alias1)] 05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[] 06)----------CoalesceBatchesExec: target_batch_size=8192 07)------------RepartitionExec: partitioning=Hash([alias1@0], 4), input_partitions=4 08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)----------------AggregateExec: mode=Partial, gby=[CAST(a@0 AS Int64) % 2 as alias1], aggr=[] 10)------------------MemoryExec: partitions=1, partition_sizes=[1] query I select bit_xor(a % 2) from t group by a; ---- 0 0 1 query I select bit_xor(a % 2) from t group by (a % 2); ---- 1 0 ```
duckdb test ``` D create table t(a int); D insert into t values (1); D insert into t values (1); D insert into t values (2); D insert into t values (2); D insert into t values (2); D insert into t values (3); D select bit_xor(distinct a % 2) from t; ┌───────────────────────────┐ │ bit_xor(DISTINCT (a % 2)) │ │ int32 │ ├───────────────────────────┤ │ 1 │ └───────────────────────────┘ D select bit_xor(a % 2) from t group by a; ┌──────────────────┐ │ bit_xor((a % 2)) │ │ int32 │ ├──────────────────┤ │ 0 │ │ 0 │ │ 1 │ └──────────────────┘ D select bit_xor(a % 2) from t group by (a % 2); ┌──────────────────┐ │ bit_xor((a % 2)) │ │ int32 │ ├──────────────────┤ │ 1 │ │ 0 │ └──────────────────┘ D explain select bit_xor(distinct a % 2) from t; ┌─────────────────────────────┐ │┌───────────────────────────┐│ ││ Physical Plan ││ │└───────────────────────────┘│ └─────────────────────────────┘ ┌───────────────────────────┐ │ UNGROUPED_AGGREGATE │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ bit_xor(DISTINCT #0) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ │ PROJECTION │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ (a % 2) │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ │ SEQ_SCAN │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ t │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ a │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ EC: 6 │ └───────────────────────────┘ ```

I think we can

  1. Prevent query like select bit_xor(distinct a % 2) from t to group by expression and remain distinct aggregate, convert single expression like select bit_xor(distinct a) to single AggregateExpr.
  2. Only convert single expression like select bit_xor(distinct a) to single AggregateExpr, and keep others the same
  3. Do nothing
jayzhan211 commented 1 month ago

What is the rationale of single distinct to group by? Take count for example, I think distinct accumulator could be way more efficient than normal accumulator with group by 🤔

alamb commented 1 month ago

What is the rationale of single distinct to group by? Take count for example, I think distinct accumulator could be way more efficient than normal accumulator with group by 🤔

Before we had specialized and optimized accumulators (e.g the groups accumulators) I think the hash group by was more efficient than using special aggregators. Now that we have special accumulators for distinct aggregates, it may not be needed.

we could run some benchmarks to check

It also lets you do stuff like reuse the grouping if the same argument is shared between accumlators (min(distinct x) and max(distinct x) can be rewritten to deduplicate x once

jayzhan211 commented 1 month ago
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃   rm-group ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     0.51ms │     0.47ms │ +1.09x faster │
│ QQuery 1     │    39.48ms │    40.42ms │     no change │
│ QQuery 2     │    72.55ms │    74.66ms │     no change │
│ QQuery 3     │    60.83ms │    64.15ms │  1.05x slower │
│ QQuery 4     │   411.03ms │   780.34ms │  1.90x slower │
│ QQuery 5     │   689.68ms │   848.49ms │  1.23x slower │
│ QQuery 6     │    36.17ms │    39.55ms │  1.09x slower │
│ QQuery 7     │    38.18ms │    36.60ms │     no change │
│ QQuery 8     │   752.03ms │   520.39ms │ +1.45x faster │
│ QQuery 9     │   646.74ms │   650.58ms │     no change │
│ QQuery 10    │   192.66ms │   194.48ms │     no change │
│ QQuery 11    │   217.84ms │   222.11ms │     no change │
│ QQuery 12    │   721.83ms │   725.29ms │     no change │
│ QQuery 13    │  1531.53ms │  2302.53ms │  1.50x slower │
│ QQuery 14    │   998.00ms │   997.10ms │     no change │
│ QQuery 15    │   487.51ms │   494.70ms │     no change │
│ QQuery 16    │  2073.38ms │  1678.31ms │ +1.24x faster │
│ QQuery 17    │  2035.91ms │  1731.46ms │ +1.18x faster │
│ QQuery 18    │  5263.26ms │  4561.70ms │ +1.15x faster │
│ QQuery 19    │    58.90ms │    55.16ms │ +1.07x faster │
│ QQuery 20    │  1520.68ms │  1473.57ms │     no change │
│ QQuery 21    │  1732.55ms │  1742.90ms │     no change │
│ QQuery 22    │  4368.40ms │  4373.69ms │     no change │
│ QQuery 23    │  8494.32ms │  8264.09ms │     no change │
│ QQuery 24    │   489.16ms │   491.06ms │     no change │
│ QQuery 25    │   496.44ms │   493.40ms │     no change │
│ QQuery 26    │   548.20ms │   554.44ms │     no change │
│ QQuery 27    │  1300.33ms │  1310.05ms │     no change │
│ QQuery 28    │ 10281.79ms │ 10086.87ms │     no change │
│ QQuery 29    │   420.42ms │   419.32ms │     no change │
│ QQuery 30    │   877.19ms │   850.86ms │     no change │
│ QQuery 31    │  1010.85ms │   991.67ms │     no change │
│ QQuery 32    │  9338.74ms │  8808.60ms │ +1.06x faster │
│ QQuery 33    │  3646.16ms │  4057.37ms │  1.11x slower │
│ QQuery 34    │  3540.59ms │  4003.55ms │  1.13x slower │
│ QQuery 35    │  1050.15ms │  1151.70ms │  1.10x slower │
│ QQuery 36    │   144.86ms │   146.93ms │     no change │
│ QQuery 37    │   102.85ms │   100.53ms │     no change │
│ QQuery 38    │   107.28ms │   107.49ms │     no change │
│ QQuery 39    │   389.38ms │   396.20ms │     no change │
│ QQuery 40    │    35.23ms │    36.68ms │     no change │
│ QQuery 41    │    33.02ms │    33.52ms │     no change │
│ QQuery 42    │    41.73ms │    45.64ms │  1.09x slower │
└──────────────┴────────────┴────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary       ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)       │ 66298.33ms │
│ Total Time (rm-group)   │ 65958.65ms │
│ Average Time (main)     │  1541.82ms │
│ Average Time (rm-group) │  1533.92ms │
│ Queries Faster          │          7 │
│ Queries Slower          │          9 │
│ Queries with No Change  │         27 │
└─────────────────────────┴────────────┘

No clear advantage of whether convert to group or not