apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6k stars 1.14k forks source link

SanityChecker rejects certain valid `UNION` plans #12446

Open alamb opened 2 weeks ago

alamb commented 2 weeks ago

Describe the bug

There is a regression that was added that in a very very specific circumstance with sorted data and constant predicates and UNION queries where the query will now error with a SanityCheckPlan error when it should complete.

To Reproduce

@wiedld found a reproducer as part of https://github.com/apache/datafusion/pull/12414

https://github.com/apache/datafusion/commit/c2e652e48c82aedb20c289cbf5e7a7e279aa436e

# Test: inputs into union with different orderings
query TT
explain select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1
union all
select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2
order by d, c, a, a0, b
limit 2;
----
logical_plan
01)Projection: t1.b, t1.c, t1.a, t1.a0
02)--Sort: t1.d ASC NULLS LAST, t1.c ASC NULLS LAST, t1.a ASC NULLS LAST, t1.a0 ASC NULLS LAST, t1.b ASC NULLS LAST, fetch=2
03)----Union
04)------SubqueryAlias: t1
05)--------Projection: ordered_table.b, ordered_table.c, ordered_table.a, Int32(NULL) AS a0, ordered_table.d
06)----------TableScan: ordered_table projection=[a, b, c, d]
07)------SubqueryAlias: t2
08)--------Projection: ordered_table.b, ordered_table.c, Int32(NULL) AS a, ordered_table.a0, ordered_table.d
09)----------TableScan: ordered_table projection=[a0, b, c, d]

# Test: run the query from above
# TODO: query fails since the constant columns t1.a0 and t2.a are not in the ORDER BY subquery,
# and SanityCheckPlan does not allow this.
statement error DataFusion error: SanityCheckPlan
select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1
union all
select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2
order by d, c, a, a0, b
limit 2;

statement ok
drop table ordered_table;

Expected behavior

Query should run

Additional context

We believe this was uncovered by https://github.com/apache/datafusion/pull/11196 . The error in the sort order calculation has existed for awhile but https://github.com/apache/datafusion/pull/11196 now uncovered the issue

This was released in 40.0.0 https://github.com/apache/datafusion/blob/main/dev/changelog/40.0.0.md

alamb commented 1 week ago

I spent some time finding a smaller standalone reproducer

create table t(a0 int, a int, b int, c int) as values (1, 2, 3, 4), (5, 6, 7, 8);

select * from (select c, a, NULL::int as a0 from t order by a, c) t1
union all
select * from (select c, NULL::int as a, a0 from t order by a0, c) t2
order by c, a, a0, b
limit 2;

When run

> select * from (select c, a, NULL::int as a0 from t order by a, c) t1
union all
select * from (select c, NULL::int as a, a0 from t order by a0, c) t2
order by c, a, a0, b
limit 2;
SanityCheckPlan
caused by
Error during planning: Plan: ["SortPreservingMergeExec: [c@0 ASC NULLS LAST,a@1 ASC NULLS LAST,a0@2 ASC NULLS LAST,b@3 ASC NULLS LAST], fetch=2", "  UnionExec", "    SortExec: TopK(fetch=2), expr=[c@0 ASC NULLS LAST,a@1 ASC NULLS LAST,b@3 ASC NULLS LAST], preserve_partitioning=[false]", "      ProjectionExec: expr=[c@2 as c, a@0 as a, NULL as a0, b@1 as b]", "        MemoryExec: partitions=1, partition_sizes=[1]", "    SortExec: TopK(fetch=2), expr=[c@0 ASC NULLS LAST,a0@2 ASC NULLS LAST,b@3 ASC NULLS LAST], preserve_partitioning=[false]", "      ProjectionExec: expr=[c@2 as c, NULL as a, a0@0 as a0, b@1 as b]", "        MemoryExec: partitions=1, partition_sizes=[1]"] does not satisfy order requirements: [PhysicalSortRequirement { expr: Column { name: "c", index: 0 }, options: Some(SortOptions { descending: false, nulls_first: false }) }, PhysicalSortRequirement { expr: Column { name: "a", index: 1 }, options: Some(SortOptions { descending: false, nulls_first: false }) }, PhysicalSortRequirement { expr: Column { name: "a0", index: 2 }, options: Some(SortOptions { descending: false, nulls_first: false }) }, PhysicalSortRequirement { expr: Column { name: "b", index: 3 }, options: Some(SortOptions { descending: false, nulls_first: false }) }]. Child-0 order: OrderingEquivalenceClass { orderings: [[PhysicalSortExpr { expr: Column { name: "c", index: 0 }, options: SortOptions { descending: false, nulls_first: false } }, PhysicalSortExpr { expr: Column { name: "a", index: 1 }, options: SortOptions { descending: false, nulls_first: false } }], [PhysicalSortExpr { expr: Column { name: "c", index: 0 }, options: SortOptions { descending: false, nulls_first: false } }, PhysicalSortExpr { expr: Column { name: "a0", index: 2 }, options: SortOptions { descending: false, nulls_first: false } }]] }
alamb commented 1 week ago

Even simpler:

--- Create basic table
create table t(a0 int, a int, b int, c int) as values (1, 2, 3, 4), (5, 6, 7, 8);

--- Use a sorted union with limit
select * from (select c, a, NULL::int as a0 from t order by a, c) t1
union all
select * from (select c, NULL::int as a, a0 from t order by a0, c) t2
order by a, a0, c
limit 2;

DataFusion error: SanityCheckPlan caused by Error during planning: Plan: ["SortPreservingMergeExec: [a@1 ASC NULLS LAST,a0@2 ASC NULLS LAST,c@0 ASC NULLS LAST], fetch=2", " UnionExec", " SortExec: TopK(fetch=2), expr=[a@1 ASC NULLS LAST,c@0 ASC NULLS LAST], preserve_partitioning=[false]", " ProjectionExec: expr=[c@1 as c, a@0 as a, NULL as a0]", " MemoryExec: partitions=1, partition_sizes=[1]", " SortExec: TopK(fetch=2), expr=[a0@2 ASC NULLS LAST,c@0 ASC NULLS LAST], preserve_partitioning=[false]", " ProjectionExec: expr=[c@1 as c, NULL as a, a0@0 as a0]", " MemoryExec: partitions=1, partition_sizes=[1]"] does not satisfy order requirements: [PhysicalSortRequirement { expr: Column { name: "a", index: 1 }, options: Some(SortOptions { descending: false, nulls_first: false }) }, PhysicalSortRequirement { expr: Column { name: "a0", index: 2 }, options: Some(SortOptions { descending: false, nulls_first: false }) }, PhysicalSortRequirement { expr: Column { name: "c", index: 0 }, options: Some(SortOptions { descending: false, nulls_first: false }) }]. Child-0 order: OrderingEquivalenceClass { orderings: [[PhysicalSortExpr { expr: Column { name: "a", index: 1 }, options: SortOptions { descending: false, nulls_first: false } }], [PhysicalSortExpr { expr: Column { name: "a0", index: 2 }, options: SortOptions { descending: false, nulls_first: false } }]] }

alamb commented 1 week ago

I am trying to figure out where the bug is (is it in the union equivalence properties calculation). I am not sure it is.

I added some debugging information to my small reproducer from https://github.com/apache/datafusion/issues/12446#issuecomment-2361896831 to try and figure out what the code should be doing

The debugging (full output below) shows the input equivalence properties are:

input1: [[a, c]], constants: [a0]
input2: [[a0, c]], constants: [a]

The current code computes the output equivalence properties as

result: [a, a0], constants: []

However, this seems correct to me. I worked an example to show this 🤔

Worked example

If input1 is (sorted on a, c, constant a0): a c a0
1 1 N
1 2 N
1 3 N
2 1 N
And input2 is (sorted on a0, c, constant a): a c a0
N 10 10
N 20 10
N 30 10
N 10 20

There are at least two possible outputs

Possible output 1: (sorted on a and a0)

a c a0
1 1 N
1 2 N
1 3 N
2 1 N
N 10 10
N 20 10
N 30 10
N 10 20

Possible output 2: ( also sorted on a and a0)

a c a0
N 10 10
N 20 10
N 30 10
N 10 20
1 1 N
1 2 N
1 3 N
2 1 N
Details

Here is the full output ``` Calculating union of 2 equivalence properties --------------- EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [ [ PhysicalSortExpr { expr: Column { name: "a", index: 1, }, options: SortOptions { descending: false, nulls_first: false, }, }, PhysicalSortExpr { expr: Column { name: "c", index: 0, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], ], }, constants: [ ConstExpr { expr: Column { name: "a0", index: 2, }, across_partitions: true, }, ], schema: Schema { fields: [ Field { name: "c", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "a0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, } --------------- EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [ [ PhysicalSortExpr { expr: Column { name: "a0", index: 2, }, options: SortOptions { descending: false, nulls_first: false, }, }, PhysicalSortExpr { expr: Column { name: "c", index: 0, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], ], }, constants: [ ConstExpr { expr: Column { name: "a", index: 1, }, across_partitions: true, }, ], schema: Schema { fields: [ Field { name: "c", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "a0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, } Result: --------------- EquivalenceProperties { eq_group: EquivalenceGroup { classes: [], }, oeq_class: OrderingEquivalenceClass { orderings: [ [ PhysicalSortExpr { expr: Column { name: "a", index: 1, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], [ PhysicalSortExpr { expr: Column { name: "a0", index: 2, }, options: SortOptions { descending: false, nulls_first: false, }, }, ], ], }, constants: [], schema: Schema { fields: [ Field { name: "c", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "a", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, Field { name: "a0", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {}, }, ], metadata: {}, }, } ```

berkaysynnada commented 1 week ago

input1: [[a, c]], constants: [a0] input2: [[a0, c]], constants: [a] The current code computes the output equivalence properties as result: [a, a0], constants: [] However, this seems correct to me. I worked an example to show this 🤔

Actually IMO the result should be [[a, a0, c], [a0, a, c]]. To do that, input orderings should be adjusted such that: input1: [[a, c]], constants: [a0] => [[a0, a, c], [a, a0, c], [a, c, a0]] input2: [[a0, c]], constants: [a] => [[a, a0, c], [a0, a, c], [a0, c, a]]

I could show what is in my mind in a draft if you think my proposal can fix the problem.

alamb commented 1 week ago

Actually IMO the result should be [[a, a0, c], [a0, a, c]]. To do that, input orderings should be adjusted such that:

Ah, yes that makes sense -- that c can be at the end of either order.

I could show what is in my mind in a draft if you think my proposal can fix the problem.

~That would be amazing -- thank you @berkaysynnada -- I would be happy to try and help make it happen / productionize it~

Update: let me give it a try this afternoon and report back here. If I can't make it work, perhaps a draft would help

alamb commented 1 week ago

I made progress and started a PR here: https://github.com/apache/datafusion/pull/12562. It is all tests at the moment, but I think I now have a better handle on what is needed. I'll try and write more tests and finish the code fix soon

alamb commented 6 days ago

An update here is I have a WIP PR with tests, etc. https://github.com/apache/datafusion/pull/12562

I am now trying to figure out an appropriate algorithm to implement the adjustment that @berkaysynnada suggests in https://github.com/apache/datafusion/issues/12446#issuecomment-2363159349

Actually IMO the result should be [[a, a0, c], [a0, a, c]]. To do that, input orderings should be adjusted such that: input1: [[a, c]], constants: [a0] => [[a0, a, c], [a, a0, c], [a, c, a0]] input2: [[a0, c]], constants: [a] => [[a, a0, c], [a0, a, c], [a0, c, a]]

I worry that if we are not careful, such an expansion will be some sort of expontential blow up. Maybe I can just hard code a limit of orderings to try 🤔

berkaysynnada commented 6 days ago

An update here is I have a WIP PR with tests, etc. https://github.com/apache/datafusion/pull/12562

I will share my thoughts on it tomorrow

I worry that if we are not careful, such an expansion will be some sort of expontential blow up.

I have the same concern. We need to provide comprehensive tests. It would also be better if we observe the equivalence properties in the plans that include any UnionExec.

Thank you for your effort

alamb commented 6 days ago

I have the same concern. We need to provide comprehensive tests. It would also be better if we observe the equivalence properties in the plans that include any UnionExec.

I will do so. I think I have a solution that we can start discussing tomorrow