pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.42k stars 1.97k forks source link

Left join after is_in is faster than normal left join #10358

Open luukschagen opened 1 year ago

luukschagen commented 1 year ago

Checks

Reproducible example

import random
import polars as pl
print(pl.__version__)

a = list(range(1000))

b = random.choices(list(range(300_000)), k=1000)

query_df = pl.DataFrame({"a": a, "b": b})
other_df = pl.DataFrame({"b": list(range(300_000)), "c": "somedata"})

def optimized(query_df, other_df):
    filtered = other_df.filter(pl.col("b").is_in(query_df.get_column("b")))
    return query_df.join(filtered, how='left', on='b')

print("\nNormal implementation:")

%timeit query_df.join(other_df, how='left', on='b')

print("\nWith filter optimization:")

%timeit optimized(query_df, other_df)

query_df_lazy = query_df.lazy()
other_df_lazy = other_df.lazy()

def optimized_lazy(query_df: pl.LazyFrame, other_df: pl.LazyFrame):
    filtered = other_df.with_context(query_df.select(pl.all().suffix("_query"))).with_columns(
        [pl.col("b").is_in(pl.col("b_query")).alias('filtered')]
    ).filter(pl.col('filtered'))
    return query_df.join(filtered, how='left', on='b').collect()

print("\nNormal Lazy implementation:")

%timeit query_df_lazy.join(other_df_lazy, how='left', on='b').collect()

print("\nLazy with filter implementation:")

%timeit optimized_lazy(query_df_lazy, other_df_lazy)

Output:

Normal implementation:
9.18 ms ± 114 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

With filter optimization:
3.35 ms ± 182 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Normal Lazy implementation:
9.23 ms ± 153 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Lazy with filter implementation:
3.31 ms ± 20.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Issue description

When left joining a relatively small table (query_df) on a larger table (other_df), in order to select and join a number of relevant entries from the larger table. The join can be sped up by first filtering down the larger 'other_df' with an 'is_in' filter, before doing the join.

I encountered this by accident, as my assumption would be that the internal implementation of polars would in effect do the same thing, so that explicitly doing the filter as an additional step could only slow down the join.

For completeness I also tried the lazy version, which doesn't seem to change the performance in this case, though I'm not 100% sure if my implementation of the lazy 'is_in' is the optimal/canonical one.

Expected behavior

My expectation would be that the doing a left join from a smaller table to a larger table (in effect 'selecting' the data from the larger table) would be optimized to filter the large table as efficiently as possible, so that trivially adding a filter step before the join would not be able to improve the performance (at least not in 'eager' mode).

Installed versions

``` --------Version info--------- Polars: 0.18.13 Index type: UInt32 Platform: macOS-13.5-x86_64-i386-64bit Python: 3.11.4 (main, Jun 26 2023, 17:02:21) [Clang 14.0.0 (clang-1400.0.29.202)] ----Optional dependencies---- adbc_driver_sqlite: cloudpickle: 2.2.1 connectorx: deltalake: fsspec: 2023.6.0 matplotlib: 3.7.2 numpy: 1.25.1 pandas: 2.0.3 pyarrow: 10.0.1 pydantic: 2.0.3 sqlalchemy: xlsx2csv: xlsxwriter: ```
mishpat commented 1 year ago

There's an issue of what polars needs to know about the frame before it can do these sorts of optimizations. As Richie has mentioned before, there's no join ordering optimizations yet in polars, partially because lazyframe would need to have some idea of cardinalities before knowing what to swap.

In this case, if the tables are the same size and fairly large (I increased the sample size to 3 million), my benchmarks have the "optimized" path above as 30% slower. As you shrink the size of other_df, you gain speed with your suggested, eventually getting faster. Since the gain/loss depends on relative table sizes and possibly other considerations, this is something that would require some research into when it's worthwhile, at least in the DataFrame context.

For LazyFrames, this would require cardinality estimation (and possibly more) that currently doesn't exist, and as far as I know isn't on the horizon.

ritchie46 commented 1 year ago

For the default engine (which has all the data in memory), we do determine join order JIT. But I think we must first understand why the applied filter is faster. If we understand that, then we can maybe add an optimization by sampling a cardinality estimate.

However, as @mishpat shows, this is definitely something we need to do all the time. At the moment this isn't a top priority, but when I have got some spare time, I could investigate a bit more here.

mishpat commented 1 year ago

I didn't mean to sound too negative on the overall goal: I work in a join-heavy environment, so any optimizations to that would be fantastic for me as well!