apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.37k stars 1.2k forks source link

fix Sort Merge Join to pass TPCH tests #10100

Closed comphead closed 5 months ago

comphead commented 7 months ago

I was trying to mark the Sort Merge Join as stable and run TPCH tests with SMJ enforced. Got the issue below, we need to fix before returning to discuss SMJ stable status

To run benches RESULTS_NAME=smj ./benchmarks/bench.sh run tpch_smj

Depends on #10092

thread 'tokio-runtime-worker' panicked at datafusion/physical-plan/src/joins/sort_merge_join.rs:1357:22:
index out of bounds: the len is 0 but the index is 1
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'tokio-runtime-worker' panicked at datafusion/physical-plan/src/joins/sort_merge_join.rs:1357:22:
index out of bounds: the len is 0 but the index is 1
thread 'tokio-runtime-worker' panicked at datafusion/physical-plan/src/joins/sort_merge_join.rs:1357:22:
index out of bounds: the len is 0 but the index is 1
thread 'tokio-runtime-worker' panicked at datafusion/physical-plan/src/joins/sort_merge_join.rs:1357:22:
index out of bounds: the len is 0 but the index is 1
Error: Context("Join Error", External(JoinError::Panic(Id(88693), ...)))

Originally posted by @comphead in https://github.com/apache/arrow-datafusion/issues/9846#issuecomment-2057999488

comphead commented 7 months ago

The TPCH benchmarks for SMJ https://github.com/apache/arrow-datafusion/pull/10092

comphead commented 7 months ago

Only Q21 failing

comphead commented 7 months ago

Narrowed down the problem to query

    with 
    t1 as (select 1 a, 2 b) 
        select * from t1 where exists (select 1 from t1 t2 where t2.a = t1.a and t2.b != t1.b);

UPD: simpler reproduce query

comphead commented 7 months ago

The problem is in LeftSemi/LeftAnti join types with extra join filter. So the join side gets built correctly but join filter for notEq case gets crashed

comphead commented 7 months ago

Full test to reproduce

    #[tokio::test]
    async fn test_() -> Result<()> {
        let ctx: SessionContext = SessionContext::new();

        let sql = "set datafusion.optimizer.prefer_hash_join = false;";
        let _ = ctx.sql(sql).await?.collect().await?;

        let sql = "
        with 
        t1 as (select 1 a, 2 b) 
            select * from t1 where exists (select 1 from t1 t2 where t2.a = t1.a and t2.b != t1.a);
        ";
        let _ = ctx.sql(sql).await?.collect().await?;

        Ok(())
    }