pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.22k stars 1.95k forks source link

Getting `ComputeError: buffer's length is too small in mmap` when concatenating 100s of parquet files #16227

Open DeflateAwning opened 5 months ago

DeflateAwning commented 5 months ago

Checks

Reproducible example

My appologies that I don't have a minumum reproducable example, as the failure seems somewhat random.


def concat_all_parquets_in_list_chunks(
    input_parquet_paths: list[Path],
    final_parquet_path: Path,
    data_schema: dict[str, pl.DataType],
    sort_cols: list[str],
    chunk_size: int = 35,
) -> None:

    accumulate_parquet_path_in = final_parquet_path.with_name(f"accumulate_in_{final_parquet_path.name}")
    accumulate_parquet_path_out = final_parquet_path.with_name(f"accumulate_out_{final_parquet_path.name}")

    input_parquet_paths_chunks = list(yield_chunks(input_parquet_paths, chunk_size=chunk_size))
    for chunk_num, input_parquet_paths_chunk in enumerate(tqdm(input_parquet_paths_chunks, unit='chunk_of_parquets')):
        logger.info(f"Starting parquet chunk {chunk_num:,}")
        scanned_parquets = [
            pl.scan_parquet(parquet_path)
            for parquet_path in input_parquet_paths_chunk
        ]
        if chunk_num > 0:
            scanned_parquets.append(pl.scan_parquet(accumulate_parquet_path_in))

        pl.concat(scanned_parquets).cast(data_schema).sort(sort_cols).sink_parquet(accumulate_parquet_path_out, compression='uncompressed')
        logger.info(f"Finished concatenating parquet chunk {chunk_num:,}")

        # rename and overwrite
        if chunk_num > 0:
            accumulate_parquet_path_in.unlink()
        accumulate_parquet_path_out.rename(accumulate_parquet_path_in)

Log output

Coming soon

Issue description

Calling that function results in successes for 20+ chunks (hundreds of files), and then encounters an error with no indication of what went wrong.

The error is: ComputeError: buffer's length is too small in mmap

Expected behavior

Function should work.

Installed versions

``` --------Version info--------- Polars: 0.20.25 Index type: UInt32 Platform: Linux-6.5.0-1022-oem-x86_64-with-glibc2.35 Python: 3.9.19 (main, Apr 6 2024, 17:57:55) [GCC 11.4.0] ----Optional dependencies---- adbc_driver_manager: 0.11.0 cloudpickle: 3.0.0 connectorx: 0.3.3 deltalake: 0.17.4 fastexcel: 0.10.4 fsspec: 2024.3.1 gevent: hvplot: matplotlib: 3.8.4 nest_asyncio: 1.6.0 numpy: 1.26.4 openpyxl: 3.1.2 pandas: 1.5.3 pyarrow: 11.0.0 pydantic: pyiceberg: pyxlsb: 1.0.10 sqlalchemy: 1.4.52 torch: xlsx2csv: 0.8.2 xlsxwriter: 3.0.9 ```
ritchie46 commented 5 months ago

Would really appreciate a repro here. This doesn't ring a bell.

DeflateAwning commented 5 months ago

The situation seems to be that, depending on the compute environment configuration (amount of RAM), as the computation is running and the system is running out of memory, it either straight runs out of memory, or runs into this bug where this error occurs. I've been unable to create a reliable repro though; will keep trying.

Here's the repro I'm working with, but it doesn't fail reliably, and is highly dependent on available RAM:

from pathlib import Path
import polars as pl
import random

# generate 4000 parquets, each with 100k rows
file_count = 4000
(input_folder_path := Path('./temp_RERPO_16227')).mkdir(exist_ok=True, parents=True)
print(f"Writing to: {input_folder_path.absolute()}")
for file_num in range(file_count):
    row_count = 100_000
    df = pl.DataFrame(
        {
            "file_num": pl.Series([file_num] * row_count),
            "random_num": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
            "random_num_1": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
            "random_num_2": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
            "random_num_3": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
            "random_num_4": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
            "random_num_5": pl.Series([random.randint(1, 100_000) for _ in range(row_count)]),
            'col1': pl.Series(["123", "abc", "xyz"]).sample(row_count, with_replacement=True),
            'col2': pl.Series(["123", "abc", "xyz"]).sample(row_count, with_replacement=True),
            'col3': pl.Series(["123", "abc", "xyz"]).sample(row_count, with_replacement=True),
            'col4': pl.Series(["123", "abc", "xyz"]).sample(row_count, with_replacement=True),
        }
    ).with_row_index("orig_row_number")
    df.write_parquet(input_folder_path / f"in_file_{file_num}.parquet")
    print(f"Made parquet {file_num + 1}/{file_count}")

print(f"Made {file_count:,} parquets. Total size: {sum(f.stat().st_size for f in input_folder_path.glob('*.parquet')):,} bytes")

# then concat them all into one big parquet
(output_folder_path := Path('./temp_RERPO_16227_output')).mkdir(exist_ok=True, parents=True)
output_path = output_folder_path / "out_file.parquet"

dfs = [
    pl.scan_parquet(f)
    for f in input_folder_path.glob("*.parquet")
]
pl.concat(dfs).sort("random_num").sink_parquet(output_path)
print(f"Concatenated {file_count:,} parquets into one big parquet. Total size: {output_path.stat().st_size:,} bytes")