apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.18k stars 1.17k forks source link

Cross-schema/cross-catalog qualified column doesn't do ambiguity check #6012

Open Jefffrey opened 1 year ago

Jefffrey commented 1 year ago

Describe the bug

If joining two identical tables from different schemas, and selecting a column using a table qualifier as part of the identifier, it should do ambiguity check and fail if referring to an ambiguous column.

To Reproduce

Via datafusion-cli:

DataFusion CLI v22.0.0
❯ create schema s1;
0 rows in set. Query took 0.040 seconds.
❯ create schema s2;
0 rows in set. Query took 0.001 seconds.
❯ create table s1.t as select 1 as a, 2 as b;
0 rows in set. Query took 0.033 seconds.
❯ create table s2.t as select 1 as a, 2 as b;
0 rows in set. Query took 0.003 seconds.
❯ select t.b from s1.t join s2.t using (a);
+---+
| b |
+---+
| 2 |
+---+
1 row in set. Query took 0.039 seconds.
❯ explain select t.b from s1.t join s2.t using (a);
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                       |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: s1.t.b                                                                                                         |
|               |   Inner Join: Using s1.t.a = s2.t.a                                                                                        |
|               |     TableScan: s1.t projection=[a, b]                                                                                      |
|               |     TableScan: s2.t projection=[a]                                                                                         |
| physical_plan | ProjectionExec: expr=[b@1 as b]                                                                                            |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                              |
|               |     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "a", index: 0 }, Column { name: "a", index: 0 })] |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                          |
|               |         RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 12), input_partitions=1                       |
|               |           MemoryExec: partitions=1, partition_sizes=[1]                                                                    |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                          |
|               |         RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 12), input_partitions=1                       |
|               |           MemoryExec: partitions=1, partition_sizes=[1]                                                                    |
|               |                                                                                                                            |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.031 seconds.
❯

Can see there is identical table t in both schemas s1 and s2, and selecting t.b column (see it's qualified with table) should do ambiguity check as could be in either table.

To note, if column is not qualified at all and left as b then ambiguity check will occur and return error.

Similarly if schema & table are identical but in separate catalogs, issue also occurs:

DataFusion CLI v22.0.0
❯ create database d1;
0 rows in set. Query took 0.001 seconds.
❯ create database d2;
0 rows in set. Query took 0.001 seconds.
❯ create schema d1.s;
0 rows in set. Query took 0.001 seconds.
❯ create schema d2.s;
0 rows in set. Query took 0.001 seconds.
❯ create table d1.s.t as select 1 as a, 2 as b;
0 rows in set. Query took 0.002 seconds.
❯ create table d2.s.t as select 1 as a, 2 as b;
0 rows in set. Query took 0.002 seconds.
❯ select t.b from d1.s.t join d2.s.t using (a);
+---+
| b |
+---+
| 2 |
+---+
1 row in set. Query took 0.006 seconds.
❯ explain select t.b from d1.s.t join d2.s.t using (a);
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                       |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: d1.s.t.b                                                                                                       |
|               |   Inner Join: Using d1.s.t.a = d2.s.t.a                                                                                    |
|               |     TableScan: d1.s.t projection=[a, b]                                                                                    |
|               |     TableScan: d2.s.t projection=[a]                                                                                       |
| physical_plan | ProjectionExec: expr=[b@1 as b]                                                                                            |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                              |
|               |     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "a", index: 0 }, Column { name: "a", index: 0 })] |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                          |
|               |         RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 12), input_partitions=1                       |
|               |           MemoryExec: partitions=1, partition_sizes=[1]                                                                    |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                          |
|               |         RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 12), input_partitions=1                       |
|               |           MemoryExec: partitions=1, partition_sizes=[1]                                                                    |
|               |                                                                                                                            |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.007 seconds.
❯

Expected behavior

Should return error about ambiguous column

Additional context

Ambiguity check was fixed in https://github.com/apache/arrow-datafusion/pull/5509 but seems this only accounted for unqualified columns, not qualified ones as well.

Jefffrey commented 1 year ago

So I took an initial stab at this: https://github.com/Jefffrey/arrow-datafusion/commit/b5548e047d45ec8f286d2f84773feb87a21a3939

I found one issue which originated from how the SQL planner was generating Exprs from the SQL AST, specifically how it searches the schema for a matching column:

https://github.com/apache/arrow-datafusion/blob/0d9c542c84f68dad42eaa0d26a55810cdd5cff2b/datafusion/sql/src/expr/identifier.rs#L282-L290

It calls field_with_name(...) which eventually flows to index_of_column_by_name(...):

https://github.com/apache/arrow-datafusion/blob/0d9c542c84f68dad42eaa0d26a55810cdd5cff2b/datafusion/common/src/dfschema.rs#L186-L220

Where can see it finds the first match only (if exists), ignoring the case where there are multiple matches. Specifically here can have multiple matches as per original issue:

https://github.com/apache/arrow-datafusion/blob/0d9c542c84f68dad42eaa0d26a55810cdd5cff2b/datafusion/common/src/dfschema.rs#L200

e.g. given the original issue, where in same schema can have fields s1.t.b and s2.t.b, where b is the column name and s1.t and s2.t are the qualifiers, when searching for t.b (qualifier t and column name b), it'll match for both of them.

I tried to preserve the original behavior of allowing multiple matches, and introduced a scoring system to try pick the best match, but it still didn't solve the issue, as I believe somewhere else down the line in the planning, it still resolves the column without doing a proper ambiguity check.

I wonder if it's better to try centralize ambiguity checks somewhere, instead of trying to hunt down the different places that can resolve columns and implementing the checks there. Like a new analysis rule to resolve column references, though would require changes to planner (large impact?).

Thoughts @alamb ?

alamb commented 1 year ago

I wonder if it's better to try centralize ambiguity checks somewhere, instead of trying to hunt down the different places that can resolve columns and implementing the checks there. Like a new analysis rule to resolve column references, though would require changes to planner (large impact?).

Yes, I think consolidating the ambiguity checks and resolving column references (where they can be more easily documented and unit tested) would be very valuable and help DataFusion be easier to work with and understand