pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.49k stars 1.87k forks source link

Out of Memory Error when Processing Large Dataset in Docker Using Streaming #15462

Open men1n2 opened 5 months ago

men1n2 commented 5 months ago

Checks

Reproducible example

use jemallocator::Jemalloc;
use polars::{
    prelude::*,
};
use std::time::Instant;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

fn main() {
    let now = Instant::now();

    let mut lf = LazyFrame::scan_parquet(
        "./dataset.parquet",
        ScanArgsParquet {
            low_memory: true,
            ..Default::default()
        },
    )
    .unwrap()
    .with_streaming(true);

    lf = lf.unnest(["fields"]);

    let query_plan = lf.clone().explain(true).unwrap();
    println!("{}", query_plan);

    lf.sink_parquet("./result.parquet".into(), Default::default())
        .unwrap();

    let elapsed = now.elapsed();
    println!("Elapsed: {:.2?}", elapsed);
}

Log output

# POLARS_PREFETCH_SIZE=8 POLARS_STREAMING_CHUNK_SIZE=1000 POLARS_VERBOSE=1 cargo run --release
POLARS PREFETCH_SIZE: 8
RUN STREAMING PIPELINE
[parquet -> function -> parquet_sink]
STREAMING CHUNK SIZE: 1000 rows
Killed

Issue description

I am using Polars with Rust to process a large dataset (larger than available memory) inside a Docker container. The Docker container's memory is intentionally limited to 20GB using --memory=20gb and --shm-size=20gb. I am encountering an out of memory error while performing calculations on the dataset.

Here's an overview of my workflow:

1- Load the dataset from a Parquet file using scan_parquet to create a LazyDataframe.
2- Perform transformations on the dataframe, which is unnesting.
3- Write the resulting data to disk as a Parquet file using sink_parquet.

Despite using LazyFrame and enabling low_memory mode in ScanArgsParquet, I still encounter an out of memory error during the execution of the code.

I have tried the following:

The printed plan indicates that every operation should be run in the streaming engine:

--- STREAMING
UNNEST by:[fields]

    Parquet SCAN ./resources/dataset.parquet
    PROJECT */2 COLUMNS  --- END STREAMING

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

However, I am still running into memory issues when processing the large dataset (Parquet file size = 20GB).

Expected behavior

Polars being able to unnest the fields and write the output to the parquet, even though the input file content is larger than memory

Installed versions

rust = `1.77.0` polars = `{ version = "0.38.3", features = ["lazy", "streaming", "parquet", "fmt", "polars-io", "json"] }`
James4Ever0 commented 4 months ago

I have encountered similar issue here, with additional info to share.

When processing one 100GB csv file, the sink_csv method will OOM. The return type is DataFrame and i suspect this is the issue. One need not to have the data in memory once calling sink_csv, otherwise it will OOM for sure. I would like to find some hidden Rust Polars APIs for this task, if possible.

Another strange behavior is that by setting certain parameters and mostly environment variables the code can run but however the output file is significantly smaller than the input file. If one enlarges these numbers the output size will increase and eventually leading to OOM. The relationship is exponential.

Additionally the way it handles UTF8 decoding issue is unclear. Did it skip the error line?

My code is shown below:

import os

os.environ["POLARS_MAX_THREADS"]="4"
os.environ["POLARS_STREAMING_CHUNK_SIZE"]="4"

import polars

input_csv = ...
output_csv = ...

df=polars.scan_csv(input_csv, ignore_errors, truncated_ragged_lines=True, low_memory=True)

df.unique(subset="company_id").sink_csv(output_csv, batch_size=10, include_header=True)
men1n2 commented 4 months ago

Any updates on this issue ?