apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.03k stars 1.14k forks source link

Probable bug in TopKAggregate #12748

Open avantgardnerio opened 2 hours ago

avantgardnerio commented 2 hours ago

Describe the bug

@dispanser pointed out a probable bug in TopKAggregate:

https://github.com/apache/datafusion/blob/77f330c6a2b26f2d1d4d4bf11d456fad466316b4/datafusion/physical-optimizer/src/topk_aggregation.rs#L102

  1. we should probably call it cardinality_reducing and allow anything that is !cardinality_reducing
  2. FilterExec is cardinality reducing, and should not be whitelisted here
  3. things like CoalescePartitionsExec should be white listed because they don't change cardinality
  4. in theory, something like a left outer join can only increase cardinality, so should be allowed as well

I would suggest we add a cardinality_reducing() or not_cardinality_reducing() method to the ExecutionPlan trait, so we don't need to maintain this downcast list.

i.e.

                    GlobalLimitExec: skip=0, fetch=100, metrics=[]
                      SortExec: TopK(fetch=100), expr=[$d.max_ts@1 DESC NULLS LAST], preserve_partitioning=[false], metrics=[]
                        AggregateExec: mode=Final, gby=[$d.traceID@0 as $d.traceID], aggr=[$d.max_ts], lim=[100], metrics=[]
                          CoalescePartitionsExec, metrics=[]
                            AggregateExec: mode=Partial, gby=[$d.traceID@0 as $d.traceID], aggr=[$d.max_ts], lim=[100], metrics=[]

is fine because CoalescePartitionsExec means the 100 rows from AggregateExec will be passed all the way up to GlobalLimitExec.

However:

                    GlobalLimitExec: skip=0, fetch=100, metrics=[]
                      SortExec: TopK(fetch=100), expr=[$d.max_ts@1 DESC NULLS LAST], preserve_partitioning=[false], metrics=[]
                        AggregateExec: mode=Final, gby=[$d.traceID@0 as $d.traceID], aggr=[$d.max_ts], lim=[100], metrics=[]
                          FilterExec, expr=[id % 2 == 0]
                            AggregateExec: mode=Partial, gby=[$d.traceID@0 as $d.traceID], aggr=[$d.max_ts], lim=[100], metrics=[]

Is not because AggregateExec could return 100 rows, FilterExec could make that 50, then GlobalLimitExec doesn't get the 100 it was guaranteed in the original plan.

To Reproduce

We need a new test.

Expected behavior

Don't lose rows.

Additional context

No response

alamb commented 2 hours ago

I agree this likely needs a test showing incorrect behavior to be actionable