apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.49k stars 1.02k forks source link

Implement a way to preserve partitioning through `UnionExec` without losing ordering #10314

Open alamb opened 2 months ago

alamb commented 2 months ago

Is your feature request related to a problem or challenge?

The EnforceDistribution physical optimizer pass in DataFusion in some cases will introduce InterleaveExec to increase partitioning when data passes through a UnionExec:

https://github.com/apache/datafusion/blob/22311835bc1b4bd83b50e1c3875b0e725622b872/datafusion/core/src/physical_optimizer/enforce_distribution.rs#L1196-L1226

Here is what InterleaveExec does: https://github.com/apache/datafusion/blob/4edbdd7d09d97f361748c086afbd7b3dda972f76/datafusion/physical-plan/src/union.rs#L286-L317

However, this has the potential downside of destroying and pre-existing ordering which is sometimes preferable than increasing / improving partitionining (e.g. see https://github.com/apache/datafusion/issues/10257 and datafusion.optimizer.prefer_existing_sort setting)

Describe the solution you'd like

I would like there to be some way to preserve the partitioning after a UnionExec without losing the ordering information and then remove the prefer_existing_union flag

Describe alternatives you've considered

One possibility is to add a preserve_order flag to InterleaveExec the same way as RepartitionExec has a preserve_order flag: https://github.com/apache/datafusion/blob/4edbdd7d09d97f361748c086afbd7b3dda972f76/datafusion/physical-plan/src/repartition/mod.rs#L328-L417

Additional context

We encountered this while working on https://github.com/apache/datafusion/pull/10259 @mustafasrepo and @phillipleblanc pointed out that config flag prefer_existing_union was effectively the same as prefer_existing_sort

xinlifoobar commented 1 month ago

Hi @alamb, I am trying to work on this.

I am not very familiar on the InterleaveExec in the optimizer. As initial thought, the interleaveExec is acting as a Repartition with equal number of input partitions and output partitions and thus a nature idea is to reuse streaming_merge with respect to the input size. Wdyt?

alamb commented 1 month ago

Hi @alamb, I am trying to work on this.

I am not very familiar on the InterleaveExec in the optimizer. As initial thought, the interleaveExec is acting as a Repartition with equal number of input partitions and output partitions and thus a nature idea is to reuse streaming_merge with respect to the input size. Wdyt?

Hi @xinlifoobar -- this sounds like it is on the right track

xinlifoobar commented 1 month ago

Hi @alamb, found another interesting case while testing. I am not very sure, do you think this could apply InterleaveExec with same order by sets?

 explain select count(*) from ((select distinct c1, c2 from t3 order by c1 ) union all (select distinct c1, c2 from t4 order by c1)) group by cube(c1,c2);
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                   |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: COUNT(*)                                                                                                                                                   |
|               |   Aggregate: groupBy=[[CUBE (t3.c1, t3.c2)]], aggr=[[COUNT(Int64(1)) AS COUNT(*)]]                                                                                     |
|               |     Union                                                                                                                                                              |
|               |       Sort: t3.c1 ASC NULLS LAST                                                                                                                                       |
|               |         Aggregate: groupBy=[[t3.c1, t3.c2]], aggr=[[]]                                                                                                                 |
|               |           TableScan: t3 projection=[c1, c2]                                                                                                                            |
|               |       Sort: t4.c1 ASC NULLS LAST                                                                                                                                       |
|               |         Aggregate: groupBy=[[t4.c1, t4.c2]], aggr=[[]]                                                                                                                 |
|               |           TableScan: t4 projection=[c1, c2]                                                                                                                            |
| physical_plan | ProjectionExec: expr=[COUNT(*)@2 as COUNT(*)]                                                                                                                          |
|               |   AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, c2@1 as c2], aggr=[COUNT(*)], ordering_mode=PartiallySorted([0])                                              |
|               |     SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                 |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                      |
|               |         RepartitionExec: partitioning=Hash([c1@0, c2@1], 14), input_partitions=14                                                                                      |
|               |           RepartitionExec: partitioning=RoundRobinBatch(14), input_partitions=2                                                                                        |
|               |             AggregateExec: mode=Partial, gby=[(c1@0 as c1, c2@1 as c2), (NULL as c1, c2@1 as c2), (c1@0 as c1, NULL as c2), (NULL as c1, NULL as c2)], aggr=[COUNT(*)] |
|               |               UnionExec                                                                                                                                                |
|               |                 CoalescePartitionsExec                                                                                                                                 |
|               |                   AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, c2@1 as c2], aggr=[]                                                                          |
|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                        |
|               |                       RepartitionExec: partitioning=Hash([c1@0, c2@1], 14), input_partitions=1                                                                         |
|               |                         AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as c2], aggr=[]                                                                             |
|               |                           MemoryExec: partitions=1, partition_sizes=[0]                                                                                                |
|               |                 CoalescePartitionsExec                                                                                                                                 |
|               |                   AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1, c2@1 as c2], aggr=[]                                                                          |
|               |                     CoalesceBatchesExec: target_batch_size=8192                                                                                                        |
|               |                       RepartitionExec: partitioning=Hash([c1@0, c2@1], 14), input_partitions=1                                                                         |
|               |                         AggregateExec: mode=Partial, gby=[c1@0 as c1, c2@1 as c2], aggr=[]                                                                             |
|               |                           MemoryExec: partitions=1, partition_sizes=[0]                                                                                                |
|               |                                                                                                                                                                        |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched. 

With InterleaveExec:

 ProjectionExec: 
   AggregateExec:
    InterleaveExec: 
      SortExec:
         AggregateExec:
      SortExec:
         AggregateExec: