apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.39k stars 1.21k forks source link

INTERSECT ALL returns wrong number of records from RHS #12955

Open vbarua opened 1 month ago

vbarua commented 1 month ago

Describe the bug

According to the SQL spec, when returning duplicate records from INTERSECT ALL the minimum number of copies from either input should be returned. Specifically:

            b) If a set operator is specified, then the result of applying
              the set operator is a table containing the following rows:

              i) Let R be a row that is a duplicate of some row in T1 or of
                 some row in T2 or both. Let m be the number of duplicates
                 of R in T1 and let n be the number of duplicates of R in
                 T2, where m � 0 and n � 0.

            ...            

            iii) If ALL is specified, then

                 Case:

                 1) If UNION is specified, then the number of duplicates of
                   R that T contains is (m + n).

                 2) If EXCEPT is specified, then the number of duplicates of
                   R that T contains is the maximum of (m - n) and 0.

                 3) If INTERSECT is specified, then the number of duplicates
                   of R that T contains is the minimum of m and n.

DataFusion currently returns ALL copies of duplicated records from the RHS.

To Reproduce

The following query

➜  ~ datafusion-cli
DataFusion CLI v42.0.0

> SELECT * FROM VALUES ('a'), ('b'), ('b'), ('c'), ('c'), ('c')
INTERSECT ALL
SELECT * FROM VALUES ('b'), ('b'), ('b'), ('c'), ('c');
+---------+
| column1 |
+---------+
| b       |
| b       |
| c       |
| c       |
| c       |
+---------+

returns 3 copies of the record ('c') which does not match the expected behaviour based on the spec.

Note that only 2 copies of ('b') are returned, so this only appears to affect the RHS.

Expected behavior

The above query should return 2 copies of the record ('c')

Additional context

See DB Fiddle for Postgres which showcases the expected behaviour: https://www.db-fiddle.com/f/ja4BG5CfyEvak5ScoBwCZr/0

jcsherin commented 1 month ago

@vbarua Thanks. This bug report is well written.

DataFusion CLI v42.0.0
> EXPLAIN SELECT * FROM VALUES ('a'), ('b'), ('b'), ('c'), ('c'), ('c')
INTERSECT ALL
SELECT * FROM VALUES ('b'), ('b'), ('b'), ('c'), ('c');
+---------------+-----------------------------------------------------------------------------------+
| plan_type     | plan                                                                              |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan  | LeftSemi Join: column1 = column1                                                  |
|               |   Values: (Utf8("a")), (Utf8("b")), (Utf8("b")), (Utf8("c")), (Utf8("c"))...      |
|               |   Values: (Utf8("b")), (Utf8("b")), (Utf8("b")), (Utf8("c")), (Utf8("c"))         |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192                                       |
|               |   HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(column1@0, column1@0)] |
|               |     ValuesExec                                                                    |
|               |     ValuesExec                                                                    |
|               |                                                                                   |
+---------------+-----------------------------------------------------------------------------------+

The query generates a left semi-join plan and therefore will return only LHS values. If RHS happens to have the minimum number of duplicates, then this query will always return incorrect results.

DataFusion CLI v42.0.0
> EXPLAIN SELECT * FROM VALUES ('a'), ('b'), ('b'), ('c'), ('c'), ('c')
EXCEPT ALL
SELECT * FROM VALUES ('b'), ('b'), ('b'), ('c'), ('c');
+---------------+-----------------------------------------------------------------------------------+
| plan_type     | plan                                                                              |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan  | LeftAnti Join: column1 = column1                                                  |
|               |   Values: (Utf8("a")), (Utf8("b")), (Utf8("b")), (Utf8("c")), (Utf8("c"))...      |
|               |   Values: (Utf8("b")), (Utf8("b")), (Utf8("b")), (Utf8("c")), (Utf8("c"))         |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192                                       |
|               |   HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(column1@0, column1@0)] |
|               |     ValuesExec                                                                    |
|               |     ValuesExec                                                                    |
|               |                                                                                   |
+---------------+-----------------------------------------------------------------------------------+

Here the query generates a left anti-join. So it will always exclude rows which match in RHS.

tokoko commented 1 month ago

I think the big question here is whether this means that intersect (and except) need to have their own logical plan nodes, after all. The alternative iiuc is to introduce something like a row number expression in the logical plan which will be used in the join and dropped afterwards.

jcsherin commented 1 month ago

I think the big question here is whether this means that intersect (and except) need to have their own logical plan nodes, after all.

Maybe not 🤔. From the doc comments it seems to me like we can reuse LogicalPlan::Union for all set operators. https://github.com/apache/datafusion/blob/636f43321acfd295096ad3ec45ef00595203f3f7/datafusion/expr/src/logical_plan/plan.rs#L230-L233

eejbyfeldt commented 1 month ago

I am not sure I understand how Union could be used. To me it makes sense to represent them in the logical plan as that would make it easier to control how they are optimized/translated into more primitive operations. There is probably multiple ways to translate them and the prefereable one might not be the same for single node vs distributed query engine.

advancedxy commented 1 month ago

I noticed this bug too.

We can have a logical plan to represent the Intersect/Except operators. But in the essential, the intersect all could be rewrote by primitive operations, such as: (quoted from Spark's RewriteIntersectAll, ExceptAll could be rewrote in a similar way)

Replaces logical Intersect operator using a combination of Union, Aggregate and Generate operator.

Input Query :
SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2

Rewritten Query:
SELECT c1
  FROM (
       SELECT replicate_row(min_count, c1)
       FROM (
            SELECT c1, If (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count
            FROM (
                 SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt
                 FROM (
                      SELECT true as vcol1, null as , c1 FROM ut1
                      UNION ALL
                      SELECT null as vcol1, true as vcol2, c1 FROM ut2
                      ) AS union_all
                 GROUP BY c1
                 HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
                 )
            )
        )

I think we could has the intersect w/wo all operator and rewrite the operator in a primitive one.