ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.05k stars 584 forks source link

feat(Polars): enable pushdown filtering for lazy dataframes #8595

Open szst11 opened 6 months ago

szst11 commented 6 months ago

Is your feature request related to a problem?

when working with partitioned parquet files and the Polars backend, the filtering on a partition gets slow, while Polars itself is able to prefilter the data:

import time
import numpy as np
import polars as pl
import os
base_path = r'C:\temp\sample_dataset'

# create the base data
num_rows = 10_000_000
data = {"data": np.random.rand(1,num_rows)[0]}
df = pl.DataFrame(data)

# save the same data in some partitions
for i_part in range(10):
    target_dir = os.path.join(base_path,'part=' + str(i_part))
    target_file = os.path.join(target_dir,'data.parquet')
    os.makedirs(target_dir)
    df.write_parquet(target_file)

# prepare evaluations
base_path_glob = os.path.join(base_path,'*','*.parquet')
lz_df = pl.scan_parquet(base_path_glob)

# prepare the ibis table
import ibis
con = ibis.polars.connect()
con.do_connect(tables = {'lz_df':lz_df})
ibis_lz_df = con.table("lz_df")

#% evaluate
start = time.time()
table_1 = lz_df.filter(pl.col('part')==1).collect().to_arrow()
end = time.time()
print('polars lazy data: ' + str(end - start))

start = time.time()
pa_table_db = ibis_lz_df.filter(ibis_lz_df['part'] == 1).to_pyarrow()
end = time.time()
print('ibis with polars lazy data: ' + str(end - start))

Describe the solution you'd like

Filtering on a lazy dataframe should be pushed down to Polars.

What version of ibis are you running?

8.0.0

What backend(s) are you using, if any?

Polars

Code of Conduct

kszucs commented 6 months ago

Thanks @szst11 for the report, confirmed but it is rather a polars issue.

Interestingly this happens because ibis renders the filter predicate as

pl.col('part') == pl.lit(1).cast(pl.Int8)  # pl.Int64 has the same effect, the `part` col has int64 datatype
# => [(col("part")) == (1.strict_cast(Int8))]

while in your pure polars example the predicate is

pl.col('part') == 1
# => [(col("part")) == (1)]

Using the predicate with the casted literal the predicate pushdown stops working in the pure polars case as well. While we can workaround this for the time being, it might be better to address upstream eventually.

cpcloud commented 6 months ago

cc @alexander-beedie Is this on y'all's radar at all?

alexander-beedie commented 6 months ago

cc @alexander-beedie Is this on y'all's radar at all?

Can read in more detail later, but it could be if someone can raise an Issue on our repo; we've been doing a lot of work with parquet/s3/hive/etc recently, so a good Issue on that subject would be very welcome 👍

cpcloud commented 6 months ago

Roger, I think it's mostly about the fact that the optimization is defeated by casting literals whose value would otherwise cause a filter on a partition.