pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.72k stars 2k forks source link

scan_pyarrow_dataset not filtering on partitions #16300

Open mtofano opened 6 months ago

mtofano commented 6 months ago

Checks

Reproducible example

# here is my dataset definition
dataset = ds.dataset(
    source=dataset_path,
    filesystem=s3_fs,  # instance of S3FileSystem
    format="arrow",
    partitioning=ds.partitioning(
        schema=pa.schema(
            [
                pa.field("underlier_id", pa.int64()),
                pa.field("trade_date", pa.date32()),
            ]
        ),
    ),
)

# pyarrow works in < 1s
data = dataset.filter(
    (pc.field("underlier_id") == 5135108)
    & (pc.field("trade_date") == trade_date)
).to_table()

# but polars scan_pyarrow_dataset never completes
data = pl.scan_pyarrow_dataset(dataset).filter(
    pl.col("underlier_id") == 5135108,
    pl.col("trade_date") == trade_date
).collect()

Log output

No response

Issue description

I have a large dataset on S3 consisting of a large amount of .arrow files. We are using directory partitioning by an integer id and a date, which looks like this:

/5135108
    /2016-01-01
        /part-0.arrow
    ...
    /2024-05-17
        /part-0.arrow
/5130371
    /2016-01-01
    ...
    /2024-05-17

We are using pyarrow to write the entirety of this dataset. On the read side polars is much preferred because of it's expressiveness. I want to use the scan_pyarrow_dataset function in order to read and perform filtering with predicate pushdown. However, it seems that polars is not filtering out the partitions defined in the polars query. When I run using pyarrow it takes less than a second to read in the data of a single file, but when I use polars scan_pyarrow_dataset, this never completes and hangs forever. I am assuming because this is not actually filtering out the partitions and it is trying to read in everything.

Expected behavior

I would expect this to filter out the irrelevant partitions from the reads, and push any predicates down to the scan level just as pyarrow does, but that does not seem to be the case.

Installed versions

``` --------Version info--------- Polars: 0.20.26 Index type: UInt32 Platform: Linux-4.18.0-513.9.1.el8_9.x86_64-x86_64-with-glibc2.28 Python: 3.9.19 | packaged by conda-forge | (main, Mar 20 2024, 12:50:21) [GCC 12.3.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 3.0.0 connectorx: deltalake: fastexcel: fsspec: 2024.3.1 gevent: hvplot: matplotlib: nest_asyncio: 1.6.0 numpy: 1.26.4 openpyxl: pandas: 1.5.3 pyarrow: 15.0.2 pydantic: 2.7.1 pyiceberg: pyxlsb: sqlalchemy: 1.4.49 torch: xlsx2csv: xlsxwriter: ```
ion-elgreco commented 6 months ago

You can check the plan with df.explain. You should see the filter being pushed down into the scan as a pyarrow compute expression.

If it's correctly showing pushed down pyarrow compute expressions, then it rather points to an issue in pyarrow, where filters are not converted to partition filters

ritchie46 commented 6 months ago

Yes, we just pass the predicates to pyarrow. So I think this should be taken upstream.

mtofano commented 6 months ago

image image

I don't think the issue is with pyarrow, as when running to_table and passing in the compute expressions works as expected outside of polars land.

I suspect the issue is the predicates are not being passed in to to_table as we would expect them to when using scan_pyarrow_dataset. See the screenshots above of my debug session. In the _scan_pyarrow_dataset_impl function I can see there are no predicates being passed in as an argument, and thus no filter is being provided to ds.to_table. The predicates seem to be getting lost in translation somewhere.

The query plan looks correct to me however from the output of explain():

data.explain()
'FILTER [([(col("underlier_id")) == (5135108)]) & ([(col("trade_date")) == (2016-01-04)])] FROM\n\n  PYTHON SCAN \n  PROJECT */7 COLUMNS'
ion-elgreco commented 6 months ago

So filtering on non-date/datetime columns works, see below: image

Run this code as-is

import polars as pl

df = pl.DataFrame({
    "foo": [1,2,3],
    "bar": [1,2,3],
    "baz": [1,2,3],
}, schema={"foo": pl.Int64, "bar": pl.Date, "baz": pl.Int64,})

df.write_delta('test_table_scan', 
               mode='overwrite', 
               delta_write_options={"partition_by": ["foo", "bar"], "engine":"rust"}, overwrite_schema=True)

print(
    pl.scan_delta('test_table_scan').filter(pl.col('foo')==2).collect()
)

However, a predicate that contains a date or datetime breaks the predicate pushdown into pyarrow, similar issue: https://github.com/pola-rs/polars/issues/16248

image

import polars as pl

df = pl.DataFrame({
    "foo": [1,2,3],
    "bar": [1,2,2],
    "baz": [1,2,3],
}, schema={"foo": pl.Int64, "bar": pl.Date, "baz": pl.Int64,})

df.write_delta('test_table_scan', 
               mode='overwrite', 
               delta_write_options={"partition_by": ["foo", "bar"], "engine":"rust"}, overwrite_schema=True)

print(
    pl.scan_delta('test_table_scan').filter(pl.col('foo')==2, pl.col('bar')== pl.date(1970,1,3)).collect()
)
ion-elgreco commented 6 months ago

Seems like the pushdown is not working when it includes date/datetimes @ritchie46

print(pl.scan_delta('test_table_scan').filter(pl.col('foo')==2, pl.col('bar')== pl.date(1970,1,3)).explain(optimized=True))

FILTER [([(col("foo")) == (2)]) & ([(col("bar")) == (dyn int: 1970.dt.datetime([dyn int: 1, dyn int: 3, dyn int: 0, dyn int: 0, dyn int: 0, dyn int: 0, String(raise)]).strict_cast(Date))])] FROM

  PYTHON SCAN 
  PROJECT */3 COLUMNS

This issue is related: https://github.com/pola-rs/polars/issues/11152

mtofano commented 6 months ago

Thank you very much for the replies!

Out of curiosity what exactly is it about dates that break the predicate pushdown? This would be a very nice feature to have as it makes scan_pyarrow_dataset unusable on date partitioned datasets, and it is a very powerful feature we'd love to take advantage of :)

ozen commented 4 months ago

@ion-elgreco +1

Datetime predicates are so widespread that not having pushdown with them is a deal breaker.

@ritchie46 any ideas?

ritchie46 commented 4 months ago

This is where we convert predicates to pyarrow predicates. https://github.com/pola-rs/polars/blob/main/crates/polars-plan/src/plans/pyarrow.rs

If someone can take a look.

ion-elgreco commented 4 months ago

@ritchie46 I did, just never had time to finish it: https://github.com/pola-rs/polars/pull/16500

Kuinox commented 3 months ago

Hello, On my side, if i have a single date predicate it works, but if I use is_in to match the dates, even the sub partition filter don't get pushed down.
I know this used to work in v0.20.31

I have minimal reproduction sample, are you interested in it ?

ritchie46 commented 3 months ago

Yes, a mwe would be appreciated.

Kuinox commented 3 months ago

By reading again the code, if it's is_in is used, not particularly when filtering the date.

the script to generate the dataset:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os

def random_timestamp(date):
    start = datetime.combine(date, datetime.min.time())
    end = datetime.combine(date, datetime.max.time())
    return start + timedelta(seconds=np.random.randint(0, int((end - start).total_seconds())))

def generate_dataset(base_path):
    base_date = datetime.now().date() - timedelta(days=5)

    for i in range(5):
        for group in range(1, 501):
            date_folder = (base_date + timedelta(days=i)).strftime("date=%Y-%m-%d")
            group_folder = f"value_group={group}"
            folder_path = os.path.join(base_path, date_folder, group_folder)
            os.makedirs(folder_path, exist_ok=True)

            data = {
                'timestamp': [random_timestamp(base_date + timedelta(days=i)) for _ in range(10)],
                'value': np.random.randint(0, 100, size=10)
            }

            df = pd.DataFrame(data)
            df.to_parquet(os.path.join(folder_path, 'data.parquet'), index=False)

generate_dataset('dataset')

The query:

import polars as pl
import pyarrow.dataset as ds
import pyarrow as pa
from datetime import datetime, timedelta

dataset_path = "dataset/"

schema = pa.schema(
    [
        pa.field('date', pa.date32()),
        pa.field('value_group', pa.int32())
    ]
)

datasets = ds.dataset(
        dataset_path,
        format='parquet',
        partitioning=ds.partitioning(
            schema= schema,
            flavor='hive'
        )
    )

df = pl.scan_pyarrow_dataset(datasets)

# df = pl.scan_parquet(dataset_path) # no problem when using this

date = datetime(2024, 8, 5).date()
value_groups = [1, 2, 3, 4, 5]
query_plan = (
    df.filter(pl.col('date') == date)
    .filter(pl.col('value_group').is_in(value_groups))
    .select(pl.col('value').mean().alias('average_value'))
    .explain())

print('Query Plan:')
print(query_plan)

The output I get:

PS C:\Dev\polars_experiments> & .\polars_v0.20.31\venv\Scripts\python .\polars_v0.20.31\query_script.py
Query Plan:
 SELECT [col("value").mean().alias("average_value")] FROM

    PYTHON SCAN
    PROJECT 3/4 COLUMNS
    SELECTION: [([(col("date")) == (2024-08-05)]) & (col("value_group").is_in([Series]))]
PS C:\Dev\polars_experiments> & .\polars_latest\venv\Scripts\python .\polars_latest\query_script.py
Query Plan:
 SELECT [col("value").mean().alias("average_value")] FROM
  FILTER [([(col("date")) == (2024-08-05)]) & (col("value_group").is_in([Series]))] FROM
    PYTHON SCAN []
    PROJECT 3/4 COLUMNS
deanm0000 commented 3 months ago

This appears to be fixed in 1.4.1

I did @Kuinox's setup and the explain with either the date or value_group looks the same.

The one thing I did to test this was to rename the 2024-08-11 folder after creating the lazyframe and then I collected it with a filter for 2024-08-10 and it returned implying that it correctly didn't try to read anything in 2024-08-11. I then did a query filtering for value_group==1 and then I got an error that it couldn't find the files under the '2024-08-11' folder (since I renamed it).

Please let me know if I'm missing anything and I'll reopen.

Kuinox commented 3 months ago

Hi, I did run my test on 1.4.1.
You can see a difference if you use pl.scan_parquet which is commented in the script I sent, when using it, it correctly push down the filters.

 SELECT [col("value").mean().alias("average_value")] FROM
  Parquet SCAN [dataset/date=2024-08-05\value_group=1\data.parquet, ... 4 other files]
  PROJECT 1/4 COLUMNS
  SELECTION: [([(col("date")) == (2024-08-05)]) & (col("value_group").is_in([Series]))]

the explain with either the date or value_group looks the same

If you remove the filter on value_group, or change it to .filter(pl.col('value_group') == 2), it push down the filter correctly, but as soon as is_in is used to filter, no filter are pushed down.

deanm0000 commented 3 months ago

The issue is that is_in doesn't push down? Just to make it more concise could you make a table or something that shows what works and what doesn't? I'm not sure how to convey interactions between the operation types so don't feel like you need to adopt this format but something like:

Idea for format, I'm not claiming this as being correct. eq is_in
String works
Number works breaks all pushdowns
Date works doesn't work
Kuinox commented 3 months ago
  | eq | is_in -- | -- | -- String | works | breaks all pushdowns Number | works | breaks all pushdowns Date | works | breaks all pushdowns
ozen commented 3 months ago

My issue with delta and datetime:

df = pl.scan_delta("gs://bucket/path")
df = df.filter(
    pl.col("capture_time") >= datetime(year=2024, month=8, day=13, tzinfo=timezone.utc),
)
print(df.explain(optimized=True))
PYTHON SCAN []
PROJECT */19 COLUMNS
SELECTION: [(col("capture_time")) >= (2024-08-13 00:00:00.dt.replace_time_zone([String(earliest)]))]

If I collect, it downloads the whole table.

Same if I change >= to ==. Same if I remove tzinfo.

This is basically what @ion-elgreco shared above. It's the same in v1.5.0.