pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.25k stars 1.85k forks source link

Possible memory leak on scanning csv and sinking to parquet #18281

Open yarimiz opened 3 weeks ago

yarimiz commented 3 weeks ago

Checks

Reproducible example


# Version 1 - just iterating over the files one by one
# tqdm here is just for checking the progress
for f in tqdm.tqdm(glob.glob("tmp/trades/*.csv")):
    target = f"tmp/trades/parquet/{os.path.basename(f).split('.')[0]}.parquet"
    if not os.path.exists(target):
        pl.scan_csv(f).select("ticker","exchange","participant_timestamp","size","price","trf_id").sink_parquet(target)

# Version 2 - using scan on the whole folder
pl.scan_csv("tmp/trades").select("ticker","exchange","participant_timestamp","size","price","trf_id").sink_parquet("all-trades.parquet")

Log output

No response

Issue description

I've been facing a weird memory usage issue when converting several big csv files into parquet, using scan_csv and sink_parquet and a simple select expression.

I have tried two versions of the code, one processing the files one by one, and one scanning the directory using polars.

In both cases, the memory usage of the WSL2 distribution kept going up until the distribution crashes when getting out of memory. I have 256GB of ram, and each CSV file weigh about 5-6 GBs.

I'm not sure if I'm missing something on how this query should behave behind the scenes, or its actually a memory leak bug.

Expected behavior

Reading each CSV file, performing the query, saving to parquet and freeing up memory in the process, so unlimited number of files can be processes.

Installed versions

--------Version info--------- Polars: 1.5.0 Index type: UInt32 Platform: Linux-5.15.153.1-microsoft-standard-WSL2-x86_64-with-glibc2.35 Python: 3.11.7 (main, Feb 7 2024, 11:43:27) [GCC 11.3.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: 2024.6.1 gevent: great_tables: hvplot: 0.10.0 matplotlib: 3.8.4 nest_asyncio: 1.6.0 numpy: 1.26.4 openpyxl: pandas: 2.2.2 pyarrow: 16.1.0 pydantic: 2.7.1 pyiceberg: sqlalchemy: torch: xlsx2csv: xlsxwriter:
ritchie46 commented 3 weeks ago

Can you try setting _RJEM_MALLOC_CONF="muzzy_decay_ms:0" before importing Polars?

yarimiz commented 3 weeks ago

@ritchie46 I assumed you meant setting this env var?

os.environ['_RJEM_MALLOC_CONF'] = "muzzy_decay_ms:0"

I tried it but the behavior is the same.

a snippet from task manager, you can see the slow and constant rise in mem usage image

ritchie46 commented 3 weeks ago

And in Polars 1.3 or 1.4?

yarimiz commented 3 weeks ago

@ritchie46 1.5

ritchie46 commented 3 weeks ago

No, I mean; do you observe the same in Polars 1.3?

yarimiz commented 3 weeks ago

@ritchie46 confirmed same behavior on 1.3 and 1.4

ritchie46 commented 3 weeks ago

Hmm.. Then I think the memory gets fragmented too much by all the heap allocations.

tiarap00 commented 2 weeks ago

I've experienced something similar trying to scan_csv and sink_parquet. The source file is a large txt (about 100gb), after row 110 each row has 10k negative values with 2 decimal places, and I've tried to use streaming to process it in my 32gb RAM machine. The first column contain mixed data (timestamps followed by 16 value rows), so the code copies the date to the Timestamp column (it's not handled further as that would break streaming).

` def process_file(input_file, output_file):

# Extract wavelength information
wavelength_start, wavelength_delta, num_points = extract_wavelength_info(input_file)

# Generate wavelength column names
wavelengths = np.arange(wavelength_start, 
                        wavelength_start + num_points * wavelength_delta, 
                        wavelength_delta)
wl_cols = [f"{wl:.3f}" for wl in wavelengths]

schema = {wl_cols[0]: pl.Utf8}  
schema.update({col: pl.Float32 for col in wl_cols[1:]})  # Rest as Float32

# Data reading and processing
df = (pl.scan_csv(
    input_file,
    separator='\t',
    has_header=False,
    skip_rows=SKIP_ROWS,
    new_columns=wl_cols,
    truncate_ragged_lines=False,
    decimal_comma=True,
    low_memory=True
).filter(pl.col(wl_cols[0]).is_not_null())
    .with_columns(
        Timestamp=pl.when(~pl.col(wl_cols[0]).str.contains(r'-'))
                    .then(pl.col(wl_cols[0])), 
        )
    .with_columns(
        pl.col(wl_cols[0]).str.replace(",", ".").cast(pl.Float32)
    )
    .select(["Timestamp"] + wl_cols)
)

df.sink_csv(output_file,
            maintain_order=False,
            batch_size=4)
print(f"Data has been written to {output_file}.")`

I've tried to play around with batch_size and other parameters, with no success. I Initially tried with sink_parquet, but tried to simplify even further by moving to csv which is uncompressed. I've also run the code on a 24gb laptop with the same issue. On the main 32GB machine, seems like the RAM is filled during the reading phase, as python uses 10-25gb of RAM and 120 MB/s of disk speed, but there are no errors initially, and the RAM usage fluctuates. Then, after about 10 minutes, "memory allocation of N bytes failed" error pops up, and the process stops. The laptop behaves similarly, just takes longer because of the slower CPU and smaller RAM.

Edit: using polars 1.6.0.