pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.91k stars 1.93k forks source link

Support for slice and predicate pushdown (filter().head() is slow) #19355

Open foldager opened 1 day ago

foldager commented 1 day ago

Description

I found that slice pushdown is not enabled when combined with filtering. You can see in this example that slice is done after parquet scan when combined with a filter. Both the query plan and the running time reflect this.

With both slice and predicate pushdown optimization, the query should take roughly the same amount of time as head(3). Instead it takes 460 times longer.

Generate data:

import numpy as np
import polars as pl
np.random.seed(43)
pl.DataFrame(
    {'a': np.random.randint(0, 1_000_000, size=500_000_000)}
).write_parquet('data.parquet')

Examples

import polars as pl
from codetiming import Timer

df = pl.scan_parquet('data.parquet')

with Timer(initial_text="\n## head(3)\n"):
    print(df.head(3).explain())
    _ = df.head(3).collect()

with Timer(initial_text="\n## filter\n"):
    df_filtered = df.filter(pl.col('a') > 2)
    print(df_filtered.explain())
    _ = df_filtered.collect()

with Timer(initial_text="\n## filter and head(3)\n"):
    df_filtered_head = df_filtered.head(3)
    print(df_filtered_head.explain())
    _ = df_filtered_head.collect()
## head(3)

Parquet SCAN [data.parquet]
PROJECT */1 COLUMNS
SLICE: (0, 3)
Elapsed time: 0.0034 seconds

## filter

Parquet SCAN [data.parquet]
PROJECT */1 COLUMNS
SELECTION: [(col("a")) > (2)]
Elapsed time: 1.0058 seconds

## filter and head(3)

SLICE[offset: 0, len: 3]
  Parquet SCAN [data.parquet]
  PROJECT */1 COLUMNS
  SELECTION: [(col("a")) > (2)]
Elapsed time: 1.5932 seconds
```text --------Version info--------- Polars: 1.10.0 Index type: UInt32 Platform: macOS-14.7-arm64-arm-64bit Python: 3.12.3 (main, Aug 1 2024, 12:12:15) [Clang 15.0.0 (clang-1500.3.9.4)] LTS CPU: False ----Optional dependencies---- adbc_driver_manager altair cloudpickle connectorx deltalake fastexcel fsspec gevent great_tables matplotlib nest_asyncio numpy 2.1.2 openpyxl pandas pyarrow 17.0.0 pydantic pyiceberg sqlalchemy torch xlsx2csv xlsxwriter ```
orlp commented 1 day ago

It's fundamentally impossible to push down a head down past a filter.

What you're looking for is early stopping, which is something the new streaming engine will support.

foldager commented 1 day ago

What you're looking for is early stopping, which is something the new streaming engine will support.

Great. That is exactly what I'm looking for. Related question: will the new streaming engine support that filter().slice(offset, length) skips ahead to offset without keeping all the offset rows in memory? I.e., memory will scale with length.

It's fundamentally impossible to push down a head down past a filter.

Yeah, I see that you cannot do random access (slice) before the predicate has been evaluated, if not fully, then at least until the location of the slice.

coastalwhite commented 14 hours ago

Yeah, I see that you cannot do random access (slice) before the predicate has been evaluated, if not fully, then at least until the location of the slice.

This is also not possible because we know how many items were filtered before.

The plan is to eventually split the IO based slices into pre-filter-slice and post-filter-slice. That will make quite some optimizations possible.