apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.49k stars 1.02k forks source link

Convert indexed ARRAY_AGG to NTH_VALUE #9213

Open mustafasrepo opened 4 months ago

mustafasrepo commented 4 months ago

Is your feature request related to a problem or challenge?

Query below

SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result
                        FROM multiple_ordered_table
                        GROUP BY a;

and

SELECT a, NTH_VALUE(c, 1 ORDER BY c) as result
                        FROM multiple_ordered_table
                        GROUP BY a;

produces same results. However, first query generates following plan

"ProjectionExec: expr=[a@0 as a, (ARRAY_AGG(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1).[1] as result]",
"  AggregateExec: mode=Single, gby=[a@0 as a], aggr=[ARRAY_AGG(multiple_ordered_table.c)], ordering_mode=Sorted",
"    CsvExec: file_groups={1 group: [[CSV_PATH]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true",

whereas second query generates following plan

"ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]",
"  AggregateExec: mode=Single, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted",
"    CsvExec: file_groups={1 group: [[CSV_PATH]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true",

Describe the solution you'd like

we can rewrite first query as second one, which executes faster with less memory. Because it no longer needs to keep all results in the array_agg.

Describe alternatives you've considered

No response

Additional context

No response

Lordworms commented 4 months ago

I can do this one

Lordworms commented 4 months ago

Hello @mustafasrepo could you give me the actual table creation SQL for this issue?

mustafasrepo commented 4 months ago

Hello @mustafasrepo could you give me the actual table creation SQL for this issue?

Sure,

CREATE EXTERNAL TABLE multiple_ordered_table (
  a0 INTEGER,
  a INTEGER,
  b INTEGER,
  c INTEGER,
  d INTEGER
)
STORED AS CSV
WITH HEADER ROW
WITH ORDER (a ASC, b ASC)
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';

you can create the table in the queries with above snippet

Lordworms commented 4 months ago

take

alamb commented 4 months ago

As @Lordworms points out, maybe we can try to implement this feature in some way that is general and not special cased in the optimzer -- aka https://github.com/apache/arrow-datafusion/issues/9289

This would look like

  1. Making ARRAY_AGG an AggregateUDF (which probably would mean making a datafusion-aggregates crate
  2. Adding an API to do the rewrite

This would certainly take more work and thus more time than just implementing a special case for the BuiltInAggregateFunction, so I don't think it is necessary

However, if we think this is a reasonbale approach I can file some tickets with the basic ideas sketeched out (I didn't want to sketch out too many things at once and we already have a bunch of work related to pulling out scalar UDFs)

Lordworms commented 4 months ago

Yes, I truely want to add some general approach instead of writing some "if else", but I agree that currently we have more ticket than that, I could do this one later and do some current-ticket...