apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.21k stars 957 forks source link

Schema incorrect after select over aggregate function that returns a different type than the input #10346

Closed timsaucer closed 1 week ago

timsaucer commented 2 weeks ago

Describe the bug

When you have a column that is created using array_agg and the column name is the same as the column name of the original data, calling schema() produces a result that is based on the original data rather than the aggregate data.

To be more clear, if you call something like .aggregate(vec![col("a"), col("b")], vec![array_agg(col("c")).alias("c")]) and you print the schema, followed by a select_columns("c") you will get a different schema displayed. This only happens if the columnc above is a computed value, even if it is as simple as an expression col("c").alias("c").

The minimal example below shows this.

To Reproduce

The following code demonstrates that selecting the column changes the displayed schema.

The input csv is a simple table:

a,b,c
1,2,3
4,5,6
7,8,9
10,11,12
use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {

    let ctx = SessionContext::new();
    let mut df = ctx.read_csv("/Users/tsaucer/working/testing_ballista/lead_lag/example.csv", CsvReadOptions::default()).await?;

    println!("Original data:");
    df.clone().show().await?;

    df = df
        .clone()
        .with_column("c", col("c"))?
        .aggregate(vec![col("a"), col("b")], vec![array_agg(col("c")).alias("c")])?;

    println!("\nBefore select\n{}", df.schema().field(2));
    println!("{}", df.logical_plan().display_indent());

    df = df.select(vec![col("a"), col("b"), col("c")])?;
    println!("\nAfter select\n{}", df.schema().field(2));
    println!("{}", df.logical_plan().display_indent());

    println!("\nFinal data:");
    df.show().await?;

    Ok(())
}

Produces the following output:

Original data:
+----+----+----+
| a  | b  | c  |
+----+----+----+
| 1  | 2  | 3  |
| 4  | 5  | 6  |
| 7  | 8  | 9  |
| 10 | 11 | 12 |
+----+----+----+

Before select
Field { name: "c", data_type: List(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
Aggregate: groupBy=[[?table?.a, ?table?.b]], aggr=[[ARRAY_AGG(c) AS c]]
  Projection: ?table?.a, ?table?.b, ?table?.c AS c
    TableScan: ?table?

After select
Field { name: "c", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
Projection: ?table?.a, ?table?.b, c
  Aggregate: groupBy=[[?table?.a, ?table?.b]], aggr=[[ARRAY_AGG(c) AS c]]
    Projection: ?table?.a, ?table?.b, ?table?.c AS c
      TableScan: ?table?

Final data:
+----+----+------+
| a  | b  | c    |
+----+----+------+
| 7  | 8  | [9]  |
| 4  | 5  | [6]  |
| 1  | 2  | [3]  |
| 10 | 11 | [12] |
+----+----+------+

You can see the final table shown is correct, but the displayed schema is not.

Expected behavior

Schema should remain invariant under trivial select of col or select_columns.

Additional context

No response

timsaucer commented 2 weeks ago

Commenting here for myself to continue looking further, but I suspect this is happening in exprlist_to_fields in datafusion/expr/src/utils.rs where we look towards the input fields of aggregates. Because this type of aggregate does not produce the same type as the input field is likely the culprit.

timsaucer commented 2 weeks ago

Quickly took a look at another aggregate function that can return a different type. By calling count_distinct instead of array_agg and updating the input file to have strings instead of int64 values I verified it is also outputting the incorrect schema after doing the select operation.

jonahgao commented 1 week ago

It should be caused by exprlist_to_fields_aggregate.

There are two columns named 'c', one from the aggregated input and the other from the output. exprlist_to_fields_aggregate forcibly uses the column 'c' from the input, which is of type Int64.

jonahgao commented 1 week ago

exprlist_to_fields_aggregate was introduced by #2486, I doubt whether we still need it now because removing it won't break any tests.

jonahgao commented 1 week ago

take