apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.48k stars 1.01k forks source link

Error during physical planning when joining to subquery with count distinct aggregate #5034

Open jonmmease opened 1 year ago

jonmmease commented 1 year ago

Describe the bug

I'm seeing an error during physical planning for the following query

SELECT *
FROM "tbl"
INNER JOIN (SELECT "colB", count(DISTINCT "colA") as "colC" FROM "tbl" GROUP BY "colB") AS "q1"
USING("colB")
Plan("The left or right side of the join does not have all columns on \"on\": \nMissing on the left: {}\nMissing on the right: {Column { name: \"colB\", index: 0 }}")

where tbl is a table with columns: colA and colB (both of type UInt64).

Interestingly, planning and query evaluation work properly when the DISTINCT qualifier is removed from the count aggregation.

Context

The purpose of this query is to add a new column (colC) to the input table that contains the number of unique values of colA that correspond to each value of colB. This is a simplified reproduction of an issue that we're seeing in VegaFusion's implementation of the Vega pivot transform.

To Reproduce

Here is a Rust test that reproduces the error:

#[cfg(test)]
mod test_join_plan_bug2 {
    use std::sync::Arc;
    use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
    use datafusion::datasource::empty::EmptyTable;
    use datafusion::prelude::SessionContext;

    #[tokio::test]
    async fn count_distinct_error() {
        let schema = Arc::new(Schema::new(vec![
            Field::new("colA", DataType::UInt64, true),
            Field::new("colB", DataType::UInt64, true),
        ])) as SchemaRef;

        let empty_table = EmptyTable::new(schema);

        // Create context and register table
        let ctx = SessionContext::new();
        ctx.register_table("tbl", Arc::new(empty_table)).unwrap();

        let sql1 = r#"
SELECT *
FROM "tbl"
INNER JOIN (SELECT "colB", count(DISTINCT "colA") as "colC" FROM "tbl" GROUP BY "colB") AS "q1"
USING("colB")
    "#;

        let logical_plan = ctx.state().create_logical_plan(sql1).await.unwrap();
        let physical_plan = ctx.state().create_physical_plan(&logical_plan).await.unwrap();
        println!("{:#?}", physical_plan);
    }
}
called `Result::unwrap()` on an `Err` value: Plan("The left or right side of the join does not have all columns on \"on\": \nMissing on the left: {}\nMissing on the right: {Column { name: \"colB\", index: 0 }}")
thread 'test_join_plan_bug2::count_distinct_error' panicked at 'called `Result::unwrap()` on an `Err` value: Plan("The left or right side of the join does not have all columns on \"on\": \nMissing on the left: {}\nMissing on the right: {Column { name: \"colB\", index: 0 }}")', src/lib.rs:806:83
stack backtrace:

Expected behavior

Physical planning should complete without error.

jonmmease commented 1 year ago

Looks like there's something going wrong with column naming. I added some print statements to this function to log the left, right, and on arguments.

https://github.com/apache/arrow-datafusion/blob/ab00bc11835f98dd06fa1262d23db2ce1e53a154/datafusion/core/src/physical_plan/joins/utils.rs#L75-L93

Without the DISTINCT qualifier, it looks like this:

left: {Column { name: "colA", index: 0 }, Column { name: "colB", index: 1 }}
right: {Column { name: "colB", index: 0 }, Column { name: "colC", index: 1 }}
on: [(Column { name: "colB", index: 1 }, Column { name: "colB", index: 0 })]

With the DISTINCT qualifier, it looks like this:

left: {Column { name: "colA", index: 0 }, Column { name: "colB", index: 1 }}
right: {Column { name: "tbl.colB", index: 0 }, Column { name: "colC", index: 1 }}
on: [(Column { name: "colB", index: 1 }, Column { name: "colB", index: 0 })]

So I think the direct cause of this error is that the colB column gets named tbl.colB at some point during physical planning.


Commenting out all of the physical optimizers does not fix the issue

jonmmease commented 1 year ago

Question if anyone has made it this far, is it valid for the name of physical Column instances to have the form {table_name}.{column_name}?

I'm wondering if it's an error for Column { name: "tbl.colB", index: 0 } to exist at all, or if that's fine and the error is that join isn't equating tbl.colB with colB.

askoa commented 1 year ago

The issue looks very similar to https://github.com/apache/arrow-datafusion/issues/4794#issuecomment-1369323927

and the cause is probably same as described in https://github.com/apache/arrow-datafusion/issues/4794#issuecomment-1382626825

jonmmease commented 1 year ago

Thanks making those connections @askoa.

Also, after playing with it some more I found that commenting out the SingleDistinctToGroupBy logical plan optimizer rule "fixes" the issue I'm seeing in this query.

jonmmease commented 1 year ago

I think this is very related to https://github.com/apache/arrow-datafusion/pull/4050 by @andygrove

Here is the optimized logical plan that's generated (with SingleDistinctToGroupBy in place) for this issue's query:

Projection: tbl.colA, q1.colB, q1.colC
  Inner Join: Using tbl.colB = q1.colB
    TableScan: tbl projection=[colA, colB]
    SubqueryAlias: q1
      Projection: tbl.colB, COUNT(DISTINCT tbl.colA) AS colC
        Projection: group_alias_0 AS tbl.colB, COUNT(alias1) AS COUNT(DISTINCT tbl.colA)
          Aggregate: groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]
            Aggregate: groupBy=[[tbl.colB AS group_alias_0, tbl.colA AS alias1]], aggr=[[]]
              TableScan: tbl projection=[colA, colB]

The group_alias_0 AS tbl.colB fragment (which is introduced by the SingleDistinctToGroupBy optimizer rule) creates a new unqualified column named "tbl.colB", which isn't the same thing as the original qualified column "tbl"."colB". The join on tbl.colB = q1.colB then fails to to match the "tbl.colB" column during physical planning.