apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.5k stars 1.02k forks source link

Avoid inlining non deterministic CTE #10337

Open tgujar opened 2 months ago

tgujar commented 2 months ago

Describe the bug

Currently Datafusion will inline all CTE, a non-deterministic expression can be executed multiple times producing different results

To Reproduce

Consider the following query which uses the aggregate_test_100 data from datafusion-examples. Here, column c11 is a Float64

WITH cte as (
    SELECT sum(c4 * c11) as total 
    FROM aggregate_test_100 
    GROUP BY c1) 
SELECT total 
FROM cte 
WHERE total = (select max(total) from cte)

The optimized plan generated will inline the CTE and thus execute it twice

Projection: cte.total
  Inner Join: cte.total = __scalar_sq_1.MAX(cte.total)
    SubqueryAlias: cte
      Projection: SUM(aggregate_test_100.c4 * aggregate_test_100.c11) AS total
        Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[SUM(CAST(aggregate_test_100.c4 AS Float64) * aggregate_test_100.c11)]]
          TableScan: aggregate_test_100 projection=[c1, c4, c11]
    SubqueryAlias: __scalar_sq_1
      Aggregate: groupBy=[[]], aggr=[[MAX(cte.total)]]
        SubqueryAlias: cte
          Projection: SUM(aggregate_test_100.c4 * aggregate_test_100.c11) AS total
            Aggregate: groupBy=[[aggregate_test_100.c1]], aggr=[[SUM(CAST(aggregate_test_100.c4 AS Float64) * aggregate_test_100.c11)]]
              TableScan: aggregate_test_100 projection=[c1, c4, c11]

Expected behavior

Since summation here is dependent on ordering, I believe it is incorrect to inline the CTE here and execute it more than once.

Additional context

Related issue, which talks about possible advantages on not inlining CTE in some cases: https://github.com/apache/datafusion/issues/8777

tgujar commented 2 months ago

I can work on this if we can confirm in this is indeed a correctly reported bug. Let me know what you think, thanks!

jonahgao commented 2 months ago

I worry that judging whether a query is non-deterministic may be not easy. Perhaps we can first leave this judgment to the user, only do this when the user specifies Materialized and there are multiple references.

DuckDB may have taken a similar approach.

v0.10.3-dev779 d26007417b

D with cte as (select random()) select * from cte union select * from cte;
┌────────────────────┐
│      random()      │
│       double       │
├────────────────────┤
│ 0.9430218460038304 │
│ 0.3114725165069103 │
└────────────────────┘
D with cte as materialized (select random()) select * from cte union select * from cte;
┌─────────────────────┐
│      random()       │
│       double        │
├─────────────────────┤
│ 0.13616445031948388 │
└─────────────────────┘
tgujar commented 1 month ago

I think this would push the responsibility to the user to figure out what may be non-deterministic. I am not sure if this would be a good approach

jonahgao commented 1 month ago

Due to parallel execution, non-deterministic behavior occurs more frequently than in traditional databases. I'm not sure if it's appropriate to disable inline CTE for all non-deterministic queries, as inline has advantages in some scenarios.

Additionally, I believe that detecting non-deterministic behavior is more difficult than in traditional databases, as it is affected by whether the input is multi-partitioned, the configuration of target_partitions, whether a Repartition node has been added, and some queries, like MIN, may be immune to unordered execution, etc.

So I think it might be a good starting point to have the user specify explicitly. When users want to avoid non-deterministic behavior or recomputation, they can explicitly request no inlining.