substrait-io / substrait

A cross platform way to express data transformation, relational algebra, standardized record expression and plans.
https://substrait.io
Apache License 2.0
1.11k stars 147 forks source link

feat: add null input handling options for `any_value` #652

Closed Blizzara closed 1 week ago

Blizzara commented 2 weeks ago

This adds a "ignore_nulls" option for any_value that can be used when converting e.g. Spark's first()/first_value()/any_value()

Blizzara commented 2 weeks ago

Onne thing I'm not sure - is it better to have these as both window and aggregate functions, or could we remove the window function version and just replace with this?

Blizzara commented 2 weeks ago

Okays, I revamped the PR a bit - now it moves the first_value and last_value from window funcs into aggregate funcs, and adds the null handling options.

This makes most sense to me, but lmk what you think!

westonpace commented 2 weeks ago

I don't love it but I won't necessarily vote against it. It isn't supported by some significant engines (e.g. Postgres, SQL server, Snowflake, ). However, it does appear to be supported by DataFusion and DuckDB so there is some representation. Databricks has first/last but it marks them as "order is non-deterministics" and it's not clear that Databricks supports "ORDER BY" in an aggregate expression.

My main problem is that first(x order by x) is the same as min(x) and first(x order by y) can be obtained by arg_min(x, y). I think min / arg_min are better since they don't require an order by statement and so they are more easily implemented by engines (yes, any engine can optimize first into min / arg_min but that's just introducing extra steps for no real gain). I think the more common result would be an engine falling back to its order by implementation and introducing an expensive sort into the query (arg_min and arg_max can be calculated without a sort). For example, this appears to be what DataFusion does today:

❯ EXPLAIN SELECT first_value(val ORDER BY val) FROM foo;
+---------------+------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                     |
+---------------+------------------------------------------------------------------------------------------+
| logical_plan  | Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(foo.val) ORDER BY [foo.val ASC NULLS LAST]]] |
|               |   TableScan: foo projection=[val]                                                        |
| physical_plan | AggregateExec: mode=Single, gby=[], aggr=[FIRST_VALUE(foo.val)]                          |
|               |   SortExec: expr=[val@0 ASC NULLS LAST]                                                  |
|               |     MemoryExec: partitions=1, partition_sizes=[1]                                        |
|               |                                                                                          |
+---------------+------------------------------------------------------------------------------------------+

That plan is more expensive than SELECT min(val) FROM foo even though they are identical. That being said, I do think we should finish up the arg_min / arg_max PR (https://github.com/substrait-io/substrait/pull/326)

Blizzara commented 2 weeks ago

I don't love it but I won't necessarily vote against it. It isn't supported by some significant engines (e.g. Postgres, SQL server, Snowflake, ). However, it does appear to be supported by DataFusion and DuckDB so there is some representation. Databricks has first/last but it marks them as "order is non-deterministics" and it's not clear that Databricks supports "ORDER BY" in an aggregate expression.

By Databricks, do you mean Spark? Turns out Spark has also any_value, and they are "interchanged" ie the implementation for any_value is just first. Substrait already has any_value, so one option is to just use that.

My main problem is that first(x order by x) is the same as min(x) and first(x order by y) can be obtained by arg_min(x, y).

FWIW, I don't think those are the main uses for first/last, rather I think the need is more for the "any_value" concept. Now that I think of it, I'm not sure why Spark even has a last, but maybe there's a reason.

The main reason I started this PR was to support Spark's distinct/dropDuplicates. Those get rewritten by the optimizer into a first aggregate: https://github.com/apache/spark/blob/df13ca05c475e98bf5c218a4503513065611a47f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2262

We do have some uses of last as well, but it's possible those would be within windows, I don't have an easy way to check.

In the end, if it's preferable, I think I can actually turn Spark's first() aggregate into Substrait's any_value() and then that again into DataFusion's first_value(). Then if I do end up needing the last() as aggregate, I can add it as a Spark specific mapping or something.

Would that be better? Then I could change this PR to instead add the null-handling options into any_value, since those would be nice to have still.

EpsilonPrime commented 2 weeks ago

An alternative to first is to use a fetch relation but it becomes a lot more complicated to modify a complicated subquery to introduce it. Last does weird me out and it is a available in less places. Probably implemented for completeness and not actual use.

westonpace commented 2 weeks ago

Would that be better? Then I could change this PR to instead add the null-handling options into any_value, since those would be nice to have still.

Yes, I'd prefer that. Sorry for the churn. Agree the null handling is good.

Blizzara commented 2 weeks ago

Yes, I'd prefer that. Sorry for the churn. Agree the null handling is good.

Done! All good, makes sense to be careful when adding stuff into the standard (or standard extensions)!