apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.19k stars 1.17k forks source link

Out of memory when sorting #5108

Open andygrove opened 1 year ago

andygrove commented 1 year ago

Describe the bug Original bug was filed against Python bindings: https://github.com/apache/arrow-datafusion-python/issues/157

Describe the bug try a sort and export a parquet file using Colab generate an Out of memory error

To Reproduce

!curl -L 'https://drive.google.com/uc?export=download&id=18gv0Yd_a-Zc7CSolol8qeYVAAzSthnSN&confirm=t' > lineitem.parquet
from datafusion import SessionContext
ctx = SessionContext()
ctx.register_parquet('lineitem', 'lineitem.parquet')
df = ctx.sql("select * from lineitem order by l_shipdate")
df.write_parquet("lineitem_Datafusion.parquet")

Expected behavior I expected to use only the available memory

here is the link comparing the same using Polars and DuckDB https://colab.research.google.com/drive/1pfAPpIG7jpvGB_aHj-PXX66vRaRT0xlj#scrollTo=O8-lyg1y6RT2

comphead commented 1 year ago

@andygrove any idea how this can be reproduced? DF test is below

#[tokio::test]
async fn test_huge_sort() {
    let ctx = SessionContext::new();
    ctx.register_parquet(
        "lineitem",
        "/Users/a/lineitem.parquet",
        ParquetReadOptions::default(),
    )
    .await
    .unwrap();
    let sql = "select * from lineitem order by l_shipdate";
    let dataframe = ctx.sql(sql).await.unwrap();
    dataframe.write_parquet("/Users/a/lineitem_sorted.parquet", None).await;
}

Any idea how to limit mem for the process? I have only idea to bring up a VM with some limited memory or setting it up with ulimit

tustvold commented 1 year ago

If running on Linux you could use cgroups to artificially limit the memory available to the process.

FWIW I think this relates to the default configuration of the RuntimeEnv, which has no limit, if you configure the session with a limiting MemoryPool it should enforce a limit. If you additional configure a DiskManager it should spill

comphead commented 1 year ago

Thanks @tustvold , I ran the test with configured mem max set up and spill enabled.

    #[tokio::test]
    async fn test_huge_sort() -> Result<()> {
        let runtime_config = crate::execution::runtime_env::RuntimeConfig::new()
            .with_memory_pool(Arc::new(crate::execution::memory_pool::GreedyMemoryPool::new(1024*1024*1024)))
            .with_disk_manager(crate::execution::disk_manager::DiskManagerConfig::new_specified(vec!["/Users/a/spill/".into()]));
        let runtime = Arc::new(crate::execution::runtime_env::RuntimeEnv::new(runtime_config).unwrap());
        let ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime);
        ctx.register_parquet(
            "lineitem",
            "/Users/a/lineitem.parquet",
            ParquetReadOptions::default(),
        )
        .await
        .unwrap();
        let sql = "select * from lineitem order by l_shipdate";
        let dataframe = ctx.sql(sql).await.unwrap();
        dataframe.show_limit(10).await?;
        //dataframe.write_parquet("/Users/a/lineitem_sorted.parquet", None).await?;
        Ok(())
    }

@andygrove Seems the test still trying to consume only available memory, without exhausting all machine memory

Error: External(ResourcesExhausted("Failed to allocate additional 1419104 bytes for RepartitionExec[3] with 0 bytes already allocated - maximum available is 496736"))
test dataframe::tests::test_huge_sort ... FAILED

@tustvold However diskmanager doesn't spill anything into the folder, is it expected?

tustvold commented 1 year ago

You likely need to give it more than 1MB, not all operators can spill

djouallah commented 1 year ago

sorry, how do you pass those config using Python API

andygrove commented 1 year ago

sorry, how do you pass those config using Python API

We will need to expose them there. Should be trivial. I will add notes on the Python issue

DDtKey commented 1 year ago

Was able to reproduce similar behaviour against the latest version (17.0.0) and wasn't able against 16.0.0 - it returned correct error about allocation (Resources exhausted: Failed to allocate additional 12372192 bytes ...).

So looks like regression to me. Gonna use bisect to find commit that has introduced it and report it here with details.

My case in two words: with latest version memory limits (FairSpillPool - 4GiB) were ignored and memory consuming up to OOM (join of 4 files with ~1gb of data in each one, without explicit ordering)

But when I set prefer_hash_join = false it also returned error with resources exhausting (by 17.0.0), but it looked weird 🤔

DDtKey commented 1 year ago

This behavior(at least my case described above) were introduced here (a9ddcd3a7558437361835120659b946b903468e1, PR link).

Before - it returned Resources exhausted when I used memory-pool and currently the memory usage grows up to OOM.

It could be reproduced with similar code: UPD: a bit more easier way to repro this is described in #5162

   let ctx = SessionContext::with_config_rt(
        SessionConfig::default(),
        Arc::new(
            RuntimeEnv::new(
                RuntimeConfig::new()
                    .with_memory_pool(Arc::new(FairSpillPool::new(4 * 1024 * 1024 * 1024))),
            )
            .unwrap(),
        ),
    );

    // I can share the file - it's kind of random data, but not sure what I can use to do that. 
    // However, it's reproducible for any files which joins could lead to a large result file (> memory pool limit).
    ctx.register_csv("hr", file_path, CsvReadOptions::default())
        .await?;

    // 4 joins - just to represent a problem
    let data_frame = ctx
        .sql(
            r#"
        SELECT hr1."Emp_ID"
        from hr hr1 
        left join hr hr2 on hr1."Emp_ID" = hr2."Emp_ID" 
        left join hr hr3 on hr2."Emp_ID" = hr3."Emp_ID" 
        left join hr hr4 on hr3."Emp_ID" = hr4."Emp_ID"
    "#,
        )
        .await?;

        data_frame
        .write_csv(output_path)
        .await?;

Additional notes

It's could be useful to mention - memory consumption are jumping for 3rd join. For 2 it works totally fine and even don't return an error. This query working fine with optimizer.repartition_joins = false, it just don't consume so much memory without repartitions for joins (but, it will take much more time for sure).

comphead commented 1 year ago

HI @DDtKey I have checked both implementations (GreedyMemoryPool and FairSpillPool) for sort problem above https://github.com/apache/arrow-datafusion/issues/5108#issuecomment-1409599920

Both works correctly and returns ExhaustionError. I'll try to reproduce the same for repartition.

in the meantime you may want also to check OOM test for repartition https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/repartition/mod.rs#L1150

DDtKey commented 1 year ago

@comphead likely it's different issues. Probably make sense to create another issue with this description

JFY: in my example there is no explicit ordering at all, it's only about repartitions.

It doesn't respect memory pool for sure in my cases 🤔 And probably related to usage of unbounded channels.

Thanks for the reference, I'll check

alamb commented 1 year ago

And probably related to usage of unbounded channels.

Interestingly, the point of https://github.com/apache/arrow-datafusion/pull/4867 was in fact to remove the unbounded channels. I am looking more into this issue

alamb commented 1 year ago

I made a copy of the colab thing and tried to run the test case in datafusion-0.6.0 to see if this is a regression: https://colab.research.google.com/drive/1RS31HPnkkHoJeshAirZCP4T52Xl0dl9f?usp=sharing

It appears there is no write_parquet functionality in the 0.6.0 release so it isn't a regression from the point of view that this used to work and now doesn't.

alamb commented 1 year ago

My measurements actually suggest that DataFusion 17.0.0 is better in this regards than DataFusion 16.0.0

Using this input file:

!curl -L 'https://drive.google.com/uc?export=download&id=18gv0Yd_a-Zc7CSolol8qeYVAAzSthnSN&confirm=t' > lineitem.parquet

Using this program:

use datafusion::{prelude::{SessionContext, SessionConfig}, error::Result, execution::{runtime_env::{RuntimeConfig, RuntimeEnv}, memory_pool::{GreedyMemoryPool, FairSpillPool}, disk_manager::DiskManagerConfig}};

#[tokio::main(flavor = "multi_thread", worker_threads = 10)]
async fn main() -> Result<()> {

    let runtime_config = RuntimeConfig::new()
    //.with_memory_pool(Arc::new(GreedyMemoryPool::new(1024*1024*1024)))
        .with_memory_pool(Arc::new(FairSpillPool::new(1024*1024*1024)))
        .with_disk_manager(DiskManagerConfig::new_specified(vec!["/tmp/".into()]));

    let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
    let ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime);

    ctx.register_parquet("lineitem", "/Users/alamb/Downloads/lineitem.parquet", Default::default())
        .await.unwrap();

    let df = ctx.sql("select * from lineitem order by l_shipdate").await.unwrap();

    df.write_parquet("/Users/alamb/Downloads/lineitem_Datafusion.parquet", None)
        .await
    .unwrap();

    Ok(())
}

I tested with both DataFusion 16.0.0 / 17.0.0 and FairSpillPill / GreedyMemoryPool

datafusion = { version = "16.0.0" }

or

datafusion = { version = "17.0.0" }

And this:

        .with_memory_pool(Arc::new(FairSpillPool::new(1024*1024*1024)))

Or

        .with_memory_pool(Arc::new(FairSpillPool::new(1024*1024*1024)))

Datafusion 16.0.0 with FairSpillPool:

     Running `/Users/alamb/Software/target-df/release/rust_arrow_playground`
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ParquetError(ArrowError("underlying Arrow error: External error: Arrow error: External error: Resources exhausted: Failed to allocate additional 1419488 bytes for RepartitionExec[14] with 2837440 bytes already allocated - maximum available is 0"))', src/main.rs:26:6
stack backtrace:

DataFusion 16.0.0 and GreedyMemoryPool

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ParquetError(ArrowError("underlying Arrow error: External error: Arrow error: External error: Resources exhausted: Failed to allocate additional 1419168 bytes for RepartitionExec[4] with 0 bytes already allocated - maximum available is 552160"))', src/main.rs:26:6

DataFusion 17.0.0 and FairMemoryPool I got:

The program completed successfully 🎉

DataFusion 17.0.0 and GreedyMemoryPool I got:

warning: `rust_arrow_playground` (bin "rust_arrow_playground") generated 1 warning
    Finished release [optimized] target(s) in 3m 35s
     Running `/Users/alamb/Software/target-df/release/rust_arrow_playground`
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ParquetError(ArrowError("underlying Arrow error: External error: Arrow error: External error: Resources exhausted: Failed to allocate additional 1419168 bytes for RepartitionExec[4] with 0 bytes already allocated - maximum available is 552160"))', src/main.rs:26:6
stack backtrace:
comphead commented 1 year ago

@alamb thanks for analysis, just to be sure, can it be also an issue for DF17 and FairMemoryPool. If FairMemoryPool doesn't respect mem limit and there is enough mem to complete the sorting, then the test will be ok, although it is the issue with memory manager

alamb commented 1 year ago

I agree the manager could be improved. Basically I think the issue is that some consumers get an allocation up front and some do it on demand. Thus the on demand operators (like repartition) may end up without any memory budget for some allocation strategies.

DDtKey commented 1 year ago

And probably related to usage of unbounded channels.

Interestingly, the point of #4867 was in fact to remove the unbounded channels. I am looking more into this issue

As I already said, my case it's not about sorting, so sorry for referring to it in this issue. It's somehow related to join-repartitions and channels (because it started to happen after getting rid of unbounded ones). I like that unbounded channels were refactored/removed, but somehow it introduced an issue that I faced with.

It returns correct error with 16.0.0 & just don't respect memory limit with 17.0.0. However I can't reproduce it on small file sizes with lower mem-limits (it works correctly for both versions - returns error), so I think this might be related to buffering.

So the code is attached above in the my message, and I'm attaching file (about 1.3 GiB) - GDrive link. It's actually artificial case, but it shows the some kind of regression is exists.

crepererum commented 1 year ago

@DDtKey I can look into your case next week.

The channel refactoring I did could increase memory usage if some child node of the repartition node starts to buffer data in some uncontrolled manner. Also it changes scheduling a bit so some edge case could now show up that was hidden before.

alamb commented 1 year ago

I think there was significant work done on the join operators themselves between 16.0.0 and 17.0.0 so it may well be the case that the plans or operators that run after the repartition change their behavior

It might help to start by looking at the plans

crepererum commented 1 year ago

@DDtKey your code includes joins which are NOT included within the mem manager yet (see #5220). While my work in #4867 changed the buffering and hence can trigger issues w/ JOIN, I think that JOIN on large datasets doesn't OOM at the moment is luck to begin with. I propose we focus on #5220 and re-check your example afterwards because if it still OOMs (w/o an DataFusion error), that would be an actual bug and not just "bad luck".

DDtKey commented 1 year ago

@crepererum yes, that's true. After some investigation (and #5162 actually also related to this - and reproduces in old versions) I realized that joins didn't respect MemoryPool even before, so DataFusionError in previous version could be really just a luck. Changes of buffers just somehow highlighted the problem. My initial assumption was only due to searching difference in behavior with bisect.

So I totally agree about the priority of #5220