ibis-project / ibis-substrait

Ibis Substrait Compiler
Apache License 2.0
95 stars 19 forks source link

fix: workaround for aggregates in datafusion #1120

Closed tokoko closed 2 months ago

tokoko commented 2 months ago

Currently plans generated by ibis-substrait containing an aggregate can't be run on datafusion. After a bit of a trial and error, I used datafusion's internal sql-to-substrait converter to see what kind of plans datafusion itself generates. Seems like datafusion always expects a projection node to follow after an aggregation node.

This PR introduces a dummy projection node that basically remaps aggregate output columns and does nothing. Targeting a specific consumer in a compiler sounds bad, but since it's an easy "fix" and still generates a valid plan for other consumers, I thought a temporary workaround was worth it.

gforsyth commented 2 months ago

Hey @tokoko -- is it clear whether or not Datafusion considers their behavior a bug or not?

I'm willing to merge this as a temporary fix if:

  1. There's an upstream issue that we're tracking and can link to this PR
  2. We have a test that doesn't add the dummy projection that currently xfails on Datafusion so we know when we can remove this behavior.
tokoko commented 2 months ago

There's an upstream issue that we're tracking and can link to this PR

I haven't looked on datafusion side yet, tbh. The closest which I think is related is this one in substrait. @EpsilonPrime probably can shed more light..

We have a test that doesn't add the dummy projection that currently xfails on Datafusion so we know when we can remove this behavior.

Makes sense, but how would one do that? The compiler itself either adds the projection or it doesn't, right? We will have to somehow pass some configuration to translate function to enable/disable the workaround if we want to do that. Are you fine with that approach?

gforsyth commented 2 months ago

Are you fine with that approach?

Maybe instead we just have a hand-crafted (simple) substrait plan that aggs and doesn't project that we execute against datafusion?

EpsilonPrime commented 2 months ago

There is a bug filed for Datafusion's project behavior. I'm not aware of one for the aggregate behavior. It's odd that remapping would be required given that aggregates only output the new fields. I suppose there could be an issue with multiple grouping sets as that does add an additional column.

EpsilonPrime commented 2 months ago

Oh, I know what this is. Datafusion only allows references to be aggregated. If you have an expression with a calculation such as X+1 that needs to be projected first and then aggregated (I refer to this an interior calculation in the tests). If you have an expression such as sum(X)/sum(Y) the two fields need to be aggregated first and then projected and divided later (I call this an exterior calculation). Any you can mix the two to make it even more fun. Here are my tests:

https://github.com/voltrondata/spark-substrait-gateway/blob/main/src/gateway/tests/test_dataframe_api.py#L2578

And here is the implementation (bound up in the Spark to Substrait conversion):

https://github.com/voltrondata/spark-substrait-gateway/pull/75

tokoko commented 2 months ago

Datafusion only allows references to be aggregated. If you have an expression with a calculation such as X+1 that needs to be projected first and then aggregated

Sounds similar, but not entirely sure this is the cause. Looking into datafusion-produced plans, I saw that it needed an extra project node after an aggregate, not before. plus, if an aggregate itself was a last node in the query, everything seemed to work fine, but once you add another actual projection after it, things seemed to break down.

I'll take another look and test this out.

EpsilonPrime commented 2 months ago

A known bug in both Datafusion and DuckDB is the way that projects output fields. They don't include the input fields (only the new proejcts). That's described in this issue.

There are a few other cases where Datafusion doesn't appear to honor emits (such as on join relations). I've heard there is a desire to add the validator to Datafusion so the generated plans can be validated.

tokoko commented 2 months ago

I looked closer and seems like this is a separate issue. Datafusion simply can't execute plans that have an aggregate as the last node. Doing any sort of noop after an aggregate fixes the issue. For example a substrait plan generated from this ibis expression: table.group_by("city").aggregate(sales=filter_table["order_id"].count()) throws an error, while both of the following work:

table.group_by("city").aggregate(sales=filter_table["order_id"].count()).filter(ibis.literal(1) == ibis.literal(1))

table.group_by("city").aggregate(sales=filter_table["order_id"].count()).select("city", "sales")

tokoko commented 2 months ago

since this is a simple "fix" from ibis expression side, I'll simply amend the existing tests with the workaround (.filter(ibis.literal(True)) instead of changing the compiler and add a failing test to alert us whenever it's fixed on the datafusion side. I'll also open a new ticket on their side. Closing this. thanks for the discussion.

tokoko commented 3 weeks ago

FYI Fixed by 12875 in datafusion. I'll get rid of the empty filter workaround after the fix is incorporated in the next release.