pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.1k stars 1.94k forks source link

Polars cross join 50x slower than DuckDB cross join #15456

Closed ion-elgreco closed 7 months ago

ion-elgreco commented 7 months ago

Checks

Reproducible example

See comment below.

Log output

No response

Issue description

Cross joins with polars are lot's slower than in duckdb, with streaming is the only way to get a result. If I don't project less columns in df with a select than it will take 10+ minutes, if I just select the relevant columns than it will take only 40 second on 0.20.18 (in 0.20.10 it was double, so that improved).

This will take 10+ minutes

df_polars_only = df.lazy().join(
    parts.lazy(),
    how='cross'
).filter(
    (pl.col('id') == pl.col('id_right')) & 
    (pl.col('start') <= pl.col('date') ) &
    (pl.col('end') >= pl.col('date'))
).collect(streaming=True)

With DuckDB on Polars Dataframes:

"""
SELECT *
FROM df
    CROSS JOIN parts
WHERE
    df.id== parts.id
    and df.start<= parts.date
    and df.end>= parts.date
"""

image

Expected behavior

Be as fast as DuckDB xD

Installed versions

``` --------Version info--------- Polars: 0.20.18 Index type: UInt32 Platform: Linux-5.15.146.1-microsoft-standard-WSL2-x86_64-with-glibc2.31 Python: 3.10.12 (main, Aug 9 2023, 14:47:34) [GCC 9.4.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 2.2.1 connectorx: deltalake: 0.15.3 fastexcel: 0.10.2 fsspec: 2024.3.1 gevent: hvplot: matplotlib: 3.8.3 nest_asyncio: 1.6.0 numpy: 1.23.5 openpyxl: 3.1.2 pandas: 1.5.3 pyarrow: 15.0.2 pydantic: 1.10.14 pyiceberg: pyxlsb: sqlalchemy: 1.4.52 xlsx2csv: xlsxwriter: ```
ion-elgreco commented 7 months ago

@stinodego Here is a reproducible example πŸ˜„

import datetime
import random

def random_date(start, end):
    """Generate a random datetime between `start` and `end`"""
    return start + datetime.timedelta(
        # Get a random amount of seconds between `start` and `end`
        seconds=random.randint(0, int((end - start).total_seconds())),
    )

df = pl.DataFrame({
    "id":list(range(0,1000))*1500,
    "start_date": [random_date(datetime.datetime(2015,1,1), datetime.datetime(2020,1,1),) for i in range(1500000)]
}).with_columns(end_date = pl.col('start_date') + pl.duration(hours=random.randint(24,240)))

parts = df.sample(1700).select(
    'id',
    pl.concat_list(pl.col('start_date','end_date')).list.mean().alias('date').cast(pl.Datetime)
)

Execute polars code:

df_polars_only = df.lazy().join(
    parts.lazy(),
    how='cross'
).filter(
    (pl.col('id') == pl.col('id')) & 
    (pl.col('start_date') <= pl.col('date') ) &
    (pl.col('end_date') >= pl.col('date'))
).collect(streaming=True)

timings: 16.7 s Β± 1.98 s per loop (mean Β± std. dev. of 7 runs, 1 loop each)

Now execute in DuckDB:

sqlcode = """
SELECT *
FROM df
    CROSS JOIN parts
WHERE
    df.id== parts.id
    and df.start_date<= parts.date
    and df.end_date>= parts.date
"""
duckdb.sql(sqlcode).pl()

Timings: 18.7 ms Β± 320 Β΅s per loop (mean Β± std. dev. of 7 runs, 100 loops each)

avimallu commented 7 months ago

This is why Polars really needs non-equi joins. DuckDB's EXPLAIN will tell you that it converts this cross join to an inner join:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                             
β”‚         PROJECTION        β”‚                             
β”‚   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   β”‚                             
β”‚             id            β”‚                             
β”‚         start_date        β”‚                             
β”‚          end_date         β”‚                             
β”‚             id            β”‚                             
β”‚            date           β”‚                             
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                                          
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                             
β”‚         HASH_JOIN         β”‚                             
β”‚   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   β”‚                             
β”‚           INNER           β”‚                             
β”‚          id = id          β”‚                             
β”‚      end_date >= date     β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              
β”‚     start_date <= date    β”‚              β”‚              
β”‚   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   β”‚              β”‚              
β”‚           EC: 1           β”‚              β”‚              
β”‚          Cost: 1          β”‚              β”‚              
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚                                           
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         ARROW_SCAN        β”‚β”‚         ARROW_SCAN        β”‚
β”‚   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   β”‚β”‚   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   β”‚
β”‚             id            β”‚β”‚             id            β”‚
β”‚         start_date        β”‚β”‚            date           β”‚
β”‚          end_date         β”‚β”‚   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   β”‚
β”‚   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   β”‚β”‚           EC: 1           β”‚
β”‚           EC: 1           β”‚β”‚                           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                             

To get it, just use:

sqlcode = """
EXPLAIN
SELECT *
FROM df
    CROSS JOIN parts
WHERE
    df.id== parts.id
    and df.start_date<= parts.date
    and df.end_date>= parts.date
"""
print(duckdb.sql(sqlcode).pl().get_column("explain_value").to_list()[0])

Underlying issue is https://github.com/pola-rs/polars/issues/10068.

ion-elgreco commented 7 months ago

@avimallu right, then I can just rewrite it as an inner join ^^

avimallu commented 7 months ago

But not as an inner join on inequality conditions, since Polars doesn't support those yet, right?

(Don't know if an inner non-equi join has a specific name.)

ion-elgreco commented 7 months ago

Just doing an inner join on ID first and then filter afterwards is giving the same results

ritchie46 commented 7 months ago

Polars doesn't have non-equi joins yet. There is a tracking issue #10068