pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.08k stars 1.94k forks source link

cached dataframe makes join slower? #19480

Open jackxxu opened 4 days ago

jackxxu commented 4 days ago

Checks

Reproducible example

import polars as pl
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from typing import List

BINS_COUNT = 10
BINS_COUNT2 = 5

def process_file(file_paths):
    # print(f"Processing file: {file_path}")
    return (
        pl.scan_parquet(file_paths[0])  # Lazily read the Parquet file
        .join(pl.scan_parquet(file_paths[1]), on='id', how='left')
        .with_columns(
            pl.col('a').qcut(BINS_COUNT, labels=[str(i) for i in range(BINS_COUNT)]).cast(pl.Int8).alias('bins'),
            pl.col('a').qcut(BINS_COUNT2, labels=[str(i) for i in range(BINS_COUNT2)]).cast(pl.Int8).alias('bins_2')
        )
    )

def stitch_files(file_paths: List[List]):
    with ThreadPoolExecutor() as executor:
        # Execute the processing function for each file concurrently
        lazy_frames = list(executor.map(process_file, file_paths))

    return pl.concat(
        lazy_frames,
        rechunk=True  # Rechunking for better memory usage
    )

non_repeated_paths = list(zip(
    Path("data").glob("*.parquet"),
    Path("data2").glob("*.parquet")))

repeated_paths = list(zip(
    Path("data").glob("*.parquet"),
    ['data2/2025-01-22.parquet'] * len(list(Path("data").glob("*.parquet"))))) # 1 file repeated

%%timeit

stitch_files(non_repeated_paths).collect()  # ==>  around 2 seconds

print(stitch_files(non_repeated_paths).explain())
%%timeit

stitch_files(repeated_paths).collect()         # ==>  around 2.8 seconds
print(stitch_files(repeated_paths).explain())

Log output

this the partial output of the explain() method

    ## uncached dataframe (one of the plans) 

    Parquet SCAN [data2/2025-09-05.parquet]
    PROJECT */4 COLUMNS
    ## cached dataframe (one of the plans )

    CACHE[id: 0, cache_hits: 499]
      simple π 4/4 ["id", "a", "cat", "date"]
        Parquet SCAN [data2/2025-01-22.parquet]
        PROJECT 4/4 COLUMNS

Issue description

I am trying switch multiple parquets both horizontally (2 parquets) and vertically (hundreds of parquets) into a large dataframe. I join first horizontally and then vertically stack them.

i tried two ways. one way is each time dataframe is unique, and the second one, one of the 2 dataframe is the same for each layer. as expected, polars cached the 2nd dataframe as the explain() output shows.

however, the cache version is 40% slower than the hte uncached version in my experiments. and also, CPU utilization is limited in the cached version, which may indicate some kind of locking.

the code can be found here https://github.com/jackxxu/polars_merge/tree/main.

Expected behavior

i expected the cached version to be faster, but it turns out the opposite.

also, the CPU usage of the cached version is much lower, which explains why it is slower.

Installed versions

``` Polars: 1.10.0 Index type: UInt32 Platform: macOS-14.6.1-arm64-arm-64bit Python: 3.10.1 (main, Apr 5 2024, 20:49:47) [Clang 15.0.0 (clang-1500.3.9.4)] LTS CPU: False ----Optional dependencies---- adbc_driver_manager altair cloudpickle connectorx deltalake fastexcel fsspec gevent great_tables matplotlib nest_asyncio 1.6.0 numpy 2.1.2 openpyxl pandas pyarrow pydantic pyiceberg sqlalchemy torch xlsx2csv xlsxwriter ```
ritchie46 commented 2 days ago

The cache will turn of branch parallelization to prevent deadlocks. You can turn of CSE, if it is slower in your case.

BTW, I would really replace this snippet:

    with ThreadPoolExecutor() as executor:
        # Execute the processing function for each file concurrently
        lazy_frames = list(executor.map(process_file, file_paths))

With a pl.collect_all as now your threads are contending with our threads collect_all can use Polars' thread pool.

jackxxu commented 1 day ago

@ritchie46:

turning off CSE indeed did the trick. thank you!

With a pl.collect_all as now your threads are contending with our threads collect_all can use Polars' thread pool.

to your suggestion for pl.collect_all, the snippet is mean to create a list of lazy_dataframe (by stitiching a bunch of parquets horizontally) and then the list is passed to pl.concat to stitch vertically into 1 lazy dataframe.

the idea of using ThreadPoolExecutor is so that the reading the parquet schema to create the lazy dataframes can be done in parallel (for hundreds of them). Also, I though the input of the collect_all is a list of lazy frames. so I suspect collect_all may not help me with creating the lazy dataframes?

or perhaps I am missing something?