pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
28.67k stars 1.79k forks source link

Slow partition_by gets fixed by presorting #17562

Closed sslivkoff closed 1 month ago

sslivkoff commented 1 month ago

Checks

Reproducible example

presorting data can speed up df.partition_by() by more than 200x. this suggests either a bug or an opportunity for optimization

Reproducing Example

download test_partition_by.parquet here

import polars as pl
import time

df = pl.read_parquet('test_partition_by.parquet')
partition_columns = ['network', 'metric', 'interval', 'timestamp']

# partition with presort
t_start = time.time()
result = df.sort(partition_columns).partition_by(partition_columns)
t_presort = time.time() - t_start
print('t_presort:', '%.2f' % t_presort)

# partition without presort
t_start = time.time()
result = df.partition_by(partition_columns)
t_no_presort = time.time() - t_start
print('t_no_presort:', '%.2f' % t_no_presort)
print('presort is', '%.2f' % (t_no_presort / t_presort), 'times faster')

outputs

t_presort: 3.73
t_no_presort: 864.39
presort is 231.74 times faster

Log output

No response

Issue description

(first brought up in discord here, using a slightly different dataset than this one)

Expected behavior

Ideally the implementation of partition_by() should be able to determine whether to sort the data before proceeding to enjoy the large potential speedup. Perhaps partition_by() should perform an internal presort unless it is passed maintain_order=True.

It might also be good to note in the docs of partition_by() that performance can be dramatically improved via sorting

Installed versions

``` --------Version info--------- Polars: 1.0.0 Index type: UInt64 Platform: Linux-6.5.0-1020-gcp-x86_64-with-glibc2.35 Python: 3.10.12 (main, Mar 22 2024, 16:50:05) [GCC 11.4.0] ```
sslivkoff commented 1 month ago

initially ran on polars 1.0 but rerunning it on 1.1 the behavior is the same:

t_presort: 2.88
t_no_presort: 860.81
presort is 299.31 times faster
azirale commented 1 month ago

I also encountered the absurdly slow performance of DataFrame.partition_by, and replicated the speed-up with a pre-sort, and also verified that bringing the data into python, partitioning it there, then pushing back to polars was also significantly faster.

This is the data I was using (parquet 108MB) -- It has ~11M rows across ~12k partition values, all strings. When I tried to partition this DataFrame it would completely saturate 16 cores and make the machine unusable. I gave up after several minutes and eventually did a workaround in Python where I would:

This process took ~25s. I ran the Polars DataFrame.partition_by overnight, and it took an hour (3594s) to get to the same result, while saturating compute. Something seems very wrong if a partition by that does not require ordering guarantees is 144 times slower than pulling the data into python then sending it back to polars, in a process that does maintain original order.


Based on the issue here I added a DataFrame.sort() immediately before the .partition_by() and that produced the output in 5 seconds. That was 700x faster than without the sort, and 5x faster than the in-Python workaround (as I would expect).

This is Python 3.12.4 with Polars 1.1.0, running in WSL2 Ubuntu 22.04.3 on Windows 11. CPU is Threadripper 1950x 16-core with 64GB RAM (32GB available in WSL2).


import timeit
import polars as pl

# Easy way to print time taken
class Timer:

    def __init__(self, name: str) -> None:
        self.name = name
        pass

    def __enter__(self) -> None:
        self.start = timeit.default_timer()
        pass

    def __exit__(self, exception_type, exception_value, exception_traceback) -> None:
        self.end = timeit.default_timer()
        print(f"[{self.name}] took {self.end-self.start:.3f}s")

# Read the parquet file into a dataframe
with Timer("Read Cube"): # <1s
    cube_df = pl.read_parquet("address_ids.parquet").select("ad_ADDRESS_DETAIL_PID", "sl_STREET_NAME")

# Custom process for the same thing, using python intermediary steps
with Timer("Python Partitioning"): # 25s

    with Timer("Initialise dict with all keys"): # 1s
        street_addr_lists = { row[0]: [] for row in cube_df.select("sl_STREET_NAME").unique().iter_rows() }

    with Timer("Populate lists with values"): # 16s
        for row in cube_df.select("ad_ADDRESS_DETAIL_PID", "sl_STREET_NAME").iter_rows():
            street_addr_lists[row[1]].append(row[0])

    with Timer("Create DataFrames from lists"): # 10s
        street_addr_dfs = {
            street_name:pl.DataFrame(pl.Series("ad_ADDRESS_DETAIL_PID",street_addr_lists[street_name]))
            for street_name in street_addr_lists
        }

# Direct Partition By into dict, excluding partition columns from dataframes
# Not only does this take 3600s, it completely saturates CPU use during that time
with Timer("Polars Partition By"): # 3600s
    street_name_partition = cube_df.partition_by("sl_STREET_NAME", include_key=False, as_dict=True)

# Pre-Sort before Partition By
with Timer("Polars Sort Into Partition By"): # 5s
    sorted_street_name_partition = cube_df.sort("sl_STREET_NAME").partition_by("sl_STREET_NAME", include_key=False, as_dict=True)

# validate that data is the same
street_addr_dfs[("MALIBU")].hash_rows().sum()               # 4208857149750374523
sorted_street_name_partition[("MALIBU",)].hash_rows().sum() # 4208857149750374523
cmdlineluser commented 1 month ago

It seems using group_by can also show the same behaviour.

Testing on a reduced subset of the sample parquet file:

for _ in df.sort(partition_columns).group_by(partition_columns): pass
for _ in df.group_by(partition_columns): pass
t_presort: 0.35
t_no_presort: 10.14