apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.22k stars 1.18k forks source link

SQL case scalar_subquery logical_paln unexpected Aggregate: groupBy=[[col]] #5791

Open jiangzhx opened 1 year ago

jiangzhx commented 1 year ago

Describe the bug

logical_paln unexpected
Aggregate: groupBy=[[t2.a]], aggr=[[COUNT(UInt8(1))]]

it's should Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]

select a,b from t1 where (select count(*) from t2 where t1.a = t2.a)>0;

+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                       |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: t1.a, t1.b                                                                                                     |
|               |   Inner Join: t1.a = __scalar_sq_1.a                                                                                       |
|               |     TableScan: t1 projection=[a, b]                                                                                        |
|               |     SubqueryAlias: __scalar_sq_1                                                                                           |
|               |       Projection: t2.a                                                                                                     |
|               |         Filter: COUNT(UInt8(1)) > Int64(0)                                                                                 |
|               |           Aggregate: groupBy=[[t2.a]], aggr=[[COUNT(UInt8(1))]]                                                            |
|               |             TableScan: t2 projection=[a]                                                                                   |
| physical_plan | ProjectionExec: expr=[a@0 as a, b@1 as b]                                                                                  |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                              |
|               |     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "a", index: 0 }, Column { name: "a", index: 0 })] |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                          |
|               |         RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 8), input_partitions=8                        |
|               |           RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1                                             |
|               |             MemoryExec: partitions=1, partition_sizes=[1]                                                                  |
|               |       ProjectionExec: expr=[a@0 as a]                                                                                      |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                        |
|               |           FilterExec: COUNT(UInt8(1))@1 > 0                                                                                |
|               |             AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(UInt8(1))]                                   |
|               |               CoalesceBatchesExec: target_batch_size=8192                                                                  |
|               |                 RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 8), input_partitions=8                |
|               |                   AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(UInt8(1))]                                      |
|               |                     RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1                                   |
|               |                       MemoryExec: partitions=1, partition_sizes=[1]                                                        |
|               |                                                                                                                            |
+---------------+----------------------------------------------------------------------------------------------------------------------------+

To Reproduce

#[tokio::test]
async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
    // you can find create_join_context() in  datafusion/core/tests/dataframe.rs
    let ctx = create_join_context()?;
    ctx.sql("select a,b from t1 where (select count(*) from t2 where t1.a = t2.a)>0;")
        .await?
        .explain(false, false)?
        .show()
        .await?;
    Ok(())
}

Expected behavior

No response

Additional context

No response

mingmwang commented 1 year ago

No, why do you think it is a bug ? I think the behavior is expected. When de-correlate the subquery which contains aggregations, the inner correlated columns should be add to the group by columns for the inner query. There is a special case need to take care if the subquery is Exist subquery and the inner aggregation is a scalar aggregation(always return 1 value, for example count(*))

jiangzhx commented 1 year ago

@mingmwang thanks a lot. i'm working on #5686, i need to create all count(*) scenarios test case to compare betwen sql and dataframe , make sure they can output same logic plan. But I have encountered some challenges now, about using dataframe to generate the scalar_subquery logical/physical plan

the below dataframe testcase will failed to create physical plan

#[tokio::test]
async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
    let ctx = create_join_context()?;
    ctx.sql("select a,b from t1 where (select count(*) from t2 where t1.a = t2.a)>0;")
        .await?
        .explain(false, false)?
        .show()
        .await?;

    let subquery=Expr::ScalarSubquery(datafusion_expr::Subquery {
        subquery: Arc::new(
            ctx.table("t2")
                .await?
                .filter( col(Column::from_qualified_name("t1.a")).eq(col("t2.a")))?
                .aggregate(vec![], vec![count(Wildcard)])?
                .into_optimized_plan()
                .unwrap(),
        ),
        outer_ref_columns: vec![],
    }) ;

    ctx.table("t1")
        .await?
        .filter(
            subquery.gt(lit(ScalarValue::UInt8(Some(0))))
        )?
        .select(vec![col("t1.a"), col("t1.b")])?
        .explain(false, false)?
        .show()
        .await?;
    Ok(())
}
jiangzhx commented 1 year ago

logical_plan & physical_plan by SQL

+---------------+----------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                       |
+---------------+----------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: t1.a, t1.b                                                                                                     |
|               |   Inner Join: t1.a = __scalar_sq_1.a                                                                                       |
|               |     TableScan: t1 projection=[a, b]                                                                                        |
|               |     SubqueryAlias: __scalar_sq_1                                                                                           |
|               |       Projection: t2.a                                                                                                     |
|               |         Filter: COUNT(Int64(1)) > Int64(0)                                                                                 |
|               |           Aggregate: groupBy=[[t2.a]], aggr=[[COUNT(Int64(1))]]                                                            |
|               |             TableScan: t2 projection=[a]                                                                                   |
| physical_plan | ProjectionExec: expr=[a@0 as a, b@1 as b]                                                                                  |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                              |
|               |     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "a", index: 0 }, Column { name: "a", index: 0 })] |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                          |
|               |         RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 8), input_partitions=8                        |
|               |           RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1                                             |
|               |             MemoryExec: partitions=1, partition_sizes=[1]                                                                  |
|               |       ProjectionExec: expr=[a@0 as a]                                                                                      |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                        |
|               |           FilterExec: COUNT(Int64(1))@1 > 0                                                                                |
|               |             AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[COUNT(Int64(1))]                                   |
|               |               CoalesceBatchesExec: target_batch_size=8192                                                                  |
|               |                 RepartitionExec: partitioning=Hash([Column { name: "a", index: 0 }], 8), input_partitions=8                |
|               |                   AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[COUNT(Int64(1))]                                      |
|               |                     RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1                                   |
|               |                       MemoryExec: partitions=1, partition_sizes=[1]                                                        |
|               |                                                                                                                            |
+---------------+----------------------------------------------------------------------------------------------------------------------------+

logical_plan by dataframe,create physical_plan will failed

+--------------+-------------------------------------------------------+
| plan_type    | plan                                                  |
+--------------+-------------------------------------------------------+
| logical_plan | Filter: (<subquery>) > Int64(0)                       |
|              |   Subquery:                                           |
|              |     Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] |
|              |       Filter: t1.a = t2.a                             |
|              |         TableScan: t2                                 |
|              |   TableScan: t1 projection=[a, b]                     |
+--------------+-------------------------------------------------------+
mingmwang commented 1 year ago

This week I have some time and I will take a look.

jiangzhx commented 1 year ago

The problem I am currently facing is actually not related to this issue. thanks for @mingmwang's help.

There are 3 places here that need attention in order to write the correct code.

  1. .select(vec![count(lit(COUNT_STAR_EXPANSION))])? should add after aggregate, scalar_subquery_to_join look like need this
  2. count(*) change to count(lit(COUNT_STAR_EXPANSION)) after #5686 merge ,it's not necessary.
  3. .into_optimized_plan() change to .into_unoptimized_plan() because eliminate_projection will remove this. then got scalar_subquery_to_join not work

Maybe it's just me, but writing the correct code using a dataframe is not easy. i will add an example for this.

the right way to create scalar subquery with dataframe api.


#[tokio::test]
async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
    let ctx = create_join_context()?;
    ctx.sql("select a,b from t1 where (select count(*) from t2 where t1.a = t2.a)>0;")
        .await?
        .explain(false, false)?
        .show()
        .await?;

    let subquery = Expr::ScalarSubquery(datafusion_expr::Subquery {
        subquery: Arc::new(
            ctx.table("t2")
                .await?
                .filter(col("t1.a").eq(col("t2.a")))?
                .aggregate(vec![], vec![count(lit(COUNT_STAR_EXPANSION))])?
                .select(vec![count(lit(COUNT_STAR_EXPANSION))])?   
                .into_unoptimized_plan(),
        ),
        outer_ref_columns: vec![],
    });

    ctx.table("t1")
        .await?
        .filter(subquery.gt(lit(ScalarValue::UInt8(Some(0)))))?
        .select(vec![col("t1.a"), col("t1.b")])?
        .explain(false, false)?
        .show()
        .await?;
    Ok(())
}