apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.3k stars 1.19k forks source link

Add support for correlated subquery #6140

Open mustafasrepo opened 1 year ago

mustafasrepo commented 1 year ago

Is your feature request related to a problem or challenge?

When I run the query below

SELECT s.amount * (
         SELECT e.amount
         FROM sales_us AS e
         WHERE s.currency = e.currency AND
                 s.ts >= e.ts
         ORDER BY e.ts DESC
         LIMIT 1
       ) AS amount_usd
FROM sales_global AS s
ORDER BY s.sn;

Datafusion returns

Error: NotImplemented("Physical plan does not support logical expression (<subquery>)") Same query successfully runs on Postgre.

Describe the solution you'd like

I would like to have this feature.

Describe alternatives you've considered

No response

Additional context

To reproduce you can use test below

#[tokio::test]
async fn test_subquery() -> Result<()> {
    let config = SessionConfig::new()
        .with_target_partitions(1);
    let ctx = SessionContext::with_config(config);
    ctx.sql("CREATE TABLE sales_us (
          ts TIMESTAMP,
          currency VARCHAR(3),
          amount INT
        ) as VALUES
              ('2022-01-01 10:00:00'::timestamp, 'USD', 100.00),
              ('2022-01-01 11:00:00'::timestamp, 'USD', 200.00),
              ('2022-01-02 09:00:00'::timestamp, 'USD', 300.00),
              ('2022-01-02 10:00:00'::timestamp, 'USD', 150.00)").await?;
    ctx.sql("CREATE TABLE sales_global (
          ts TIMESTAMP,
          currency VARCHAR(3),
          amount INT
        ) as VALUES
          ('2022-01-01 08:00:00'::timestamp, 'EUR', 50.00),
          ('2022-01-01 11:30:00'::timestamp, 'EUR', 75.00),
          ('2022-01-02 12:00:00'::timestamp, 'EUR', 200.00),
          ('2022-01-03 10:00:00'::timestamp, 'EUR', 100.00)").await?;
    let sql = "SELECT s.amount * (
                 SELECT e.amount
                 FROM sales_us AS e
                 WHERE s.currency = e.currency AND
                         s.ts >= e.ts
                 ORDER BY e.ts DESC
                 LIMIT 1
               ) AS amount_usd
        FROM sales_global AS s
        ORDER BY s.sn
        ";

    let msg = format!("Creating logical plan for '{sql}'");
    let dataframe: DataFrame = ctx.sql(sql).await.expect(&msg);
    let physical_plan = dataframe.create_physical_plan().await?;
    let batches = collect(physical_plan, ctx.task_ctx()).await?;
    print_batches(&batches)?;
    Ok(())
}
mingmwang commented 1 year ago

I will work on this soon.

ozankabak commented 5 months ago

@mingmwang any thoughts on how to make progress here?

aalexandrov commented 4 months ago

@alamb can you please assign me here? I will take a stab at this next week. If I succeed, you might be able to also close some other issues listed in the enclosing epic as obsolete because there won't be any correlated queries left for the physical planning process.

alamb commented 4 months ago

Thanks @aalexandrov -- I have done so.

BTW you can also make a single word comment take and we have a bot that will assign you.

aalexandrov commented 4 months ago

BTW you can also make a single word comment take and we have a bot that will assign you.

Oh, good to know—I will take a look at the available commands next time.

Lordworms commented 3 weeks ago

Interested in this one

alamb commented 3 weeks ago

I believe @eejbyfeldt and @Dandandan have been working a bit on correlated subqueries in the context of getting all TPCS-DS running