pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
27.57k stars 1.69k forks source link

Loading wide parquet data with `scan_parquet` is orders of magnitude slower than long data #17259

Open kszlim opened 3 days ago

kszlim commented 3 days ago

Checks

Reproducible example

import polars as pl
import numpy as np
import shutil
import os
import time

NUM_COLUMNS = 10000
NUM_FILES = 500

column_names = [f'col_{i}' for i in range(NUM_COLUMNS)]
data = np.random.rand(1, NUM_COLUMNS)

wide_df = pl.DataFrame(
    data,
    schema={name: pl.Float64 for name in column_names}
)

long_df = pl.DataFrame({"a": data})

def copy_files(is_wide: bool):
    if is_wide:
        file_type = 'wide'
        df = wide_df
    else:
        file_type = 'long'
        df = long_df
    first_file = f'{file_type}_0.parquet'
    for i in range(NUM_FILES):
        ith_file = f'{file_type}_{i}.parquet'
        if i % 100 == 0:
            print(f"Processing {i}th {file_type} file")
        if os.path.exists(ith_file):
            continue
        if i == 0:
            df.write_parquet(first_file)
            continue
        shutil.copy2(first_file, ith_file)

copy_files(True)
copy_files(False)
for force_async in ["1", "0"]:
    for file_type in ("long", "wide"):
        os.environ["POLARS_FORCE_ASYNC"] = force_async
        ldf = pl.scan_parquet(f"{file_type}*.parquet")
        start = time.time()
        result = ldf.collect()
        print(f"{time.time() - start}s to load {file_type} data with force async = {force_async}")

Log output

Processing 0th wide file
Processing 100th wide file
Processing 200th wide file
Processing 300th wide file
Processing 400th wide file
Processing 0th long file
Processing 100th long file
Processing 200th long file
Processing 300th long file
Processing 400th long file
0.02911686897277832s to load long data with force async = 1
20.079597234725952s to load wide data with force async = 1
1.768498420715332s to load long data with force async = 0
15.423298597335815s to load wide data with force async = 0

Issue description

Even though the total number of datapoints is equivalent and the number of files is fixed, the load time for wide format data is orders of magnitude slower. This is probably an extreme case due to having 1 row per file (ie. the metadata parsing dominates) dramatically.

There likely are improvements that can be made to the metadata parsing? I don't see full utilization of my cores while loading the wide data.

Expected behavior

Speed difference should be less dramatic.

Installed versions

``` --------Version info--------- Polars: 1.0.0-rc.2 Index type: UInt32 Platform: Linux-6.8.0-76060800daily20240311-generic-x86_64-with-glibc2.35 Python: 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: 2024.6.0 gevent: great_tables: hvplot: matplotlib: 3.5.1 nest_asyncio: numpy: 1.26.3 openpyxl: pandas: 2.1.4 pyarrow: 14.0.2 pydantic: pyiceberg: sqlalchemy: torch: 2.3.1+cu121 xlsx2csv: xlsxwriter: ```
kszlim commented 3 days ago

Not really a bug (nor is this really a feature request?). Perhaps there isn't much improvement to be made?