pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.42k stars 1.97k forks source link

Pyarrow filter not pushed to scan_ds if datatype is a string #6395

Closed dominikpeter closed 1 year ago

dominikpeter commented 1 year ago

Polars version checks

Issue description

I have quite some performance differences when filtering a pyarrow dataset with the scanner method compared to polars own filtering. After some testing, I found a strange behavior that could explain the performance difference. Could it be that predicates will not be pushed down to the pyarrow dataset if the datatype is a string?

In the example below the predicate is None in the first example compared to the other examples that filter on an integer:

predicate = None with_columns = ['animal']

predicate = (pa.dataset.field('n_legs') == 2) with_columns = ['animal']

predicate = (pa.dataset.field('year') == 2020) with_columns = ['animal']

Therefore, on a bigger dataset, ds.scanner(filter=ds.field("animal") == "Flamingo") is way faster.

Or do I miss something?

Reproducible example

import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

from typing import cast
from functools import partial
import pickle

table = pa.table(
    {
        "year": [2020, 2022, 2021, 2022, 2019, 2021],
        "n_legs": [2, 2, 4, 4, 5, 100],
        "animal": ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"],
    }
)

pq.write_table(table, "file.parquet")

dataset = ds.dataset("file.parquet", format="parquet")

def _scan_ds_impl(ds: pa.dataset.dataset, with_columns: list[str] | None, predicate: str | None) -> pl.DataFrame:

    print(f"predicate = {predicate}")
    print(f"with_columns = {with_columns}")

    _filter = None
    if predicate:
        _filter = eval(predicate)
    return cast(pl.DataFrame, pl.from_arrow(ds.to_table(columns=with_columns, filter=_filter)))

def _scan_ds(ds: pa.dataset.dataset, allow_pyarrow_filter: bool = True) -> pl.LazyFrame:

    func = partial(_scan_ds_impl, ds)
    func_serialized = pickle.dumps(func)
    return pl.LazyFrame._scan_python_function(ds.schema, func_serialized, allow_pyarrow_filter)

df = (
    _scan_ds(dataset, allow_pyarrow_filter=True)
    .select(pl.col("animal"))
    .filter(pl.col("animal") == "Flamingo")
    .collect()
)
print("----------------")
df = _scan_ds(dataset, allow_pyarrow_filter=True).select(pl.col("animal")).filter(pl.col("n_legs") == 2).collect()

print("----------------")
df = _scan_ds(dataset, allow_pyarrow_filter=True).select(pl.col("animal")).filter(pl.col("year") == 2020).collect()

Expected behavior

Also push down the string predicate:

predicate = (pa.dataset.field('animal') == 'Flamingo') with_columns = ['animal']

predicate = (pa.dataset.field('n_legs') == 2) with_columns = ['animal']

predicate = (pa.dataset.field('year') == 2020) with_columns = ['animal']

Installed versions

---Version info--- Polars: 0.15.16 Index type: UInt32 Platform: macOS-13.1-arm64-arm-64bit Python: 3.11.1 (v3.11.1:a7a450f84a, Dec 6 2022, 15:24:06) [Clang 13.0.0 (clang-1300.0.29.30)] ---Optional dependencies--- pyarrow: 10.0.1 pandas: 1.5.2 numpy: 1.24.1 fsspec: connectorx: xlsx2csv: deltalake: matplotlib:

ritchie46 commented 1 year ago

@stinodego that would explain why partitioning must be set in scan_delta.

dominikpeter commented 1 year ago

Could we help here somehow? I think the issue is not in Python, but in the Rust backend. But we don't really know where to start debugging. We cannot not really use Delta Lake with Polars at the moment because we have to put way too much data into memory with our data. We can filter the data first and then use from_arrow to load the data. With a z-ordered table, it works well even on large datasets. So, we can work around it. However, we would like to use Polars native filter method -because we love Polars :-).

stinodego commented 1 year ago

I have to look at this more closely - I admit it fell off my radar. Will come back to this later this weekend.

stinodego commented 1 year ago

@stinodego that would explain why partitioning must be set in scan_delta.

That is a different issue. scan_delta does the following:

  1. Map the file to a DeltaTable object (deltalake dependency takes care of this)
  2. Convert to a pyarrow dataset (deltalake dependency takes care of this)
  3. Call pl.scan_ds

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

The way to handle this right now is to specify filters manually in step 2. I wouldn't know a good way to automate this.

On topic

About the original issue from this post: I can confirm that string predicates are not pushed up correctly. Thanks for the clear example. I'm really not very familiar with this scanning code - @ritchie46 could you take a closer look?

dominikpeter commented 1 year ago

@stinodego that would explain why partitioning must be set in scan_delta.

That is a different issue. scan_delta does the following:

  1. Map the file to a DeltaTable object (deltalake dependency takes care of this)
  2. Convert to a pyarrow dataset (deltalake dependency takes care of this)
  3. Call pl.scan_ds

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

The way to handle this right now is to specify filters manually in step 2. I wouldn't know a good way to automate this.

On topic

About the original issue from this post: I can confirm that string predicates are not pushed up correctly. Thanks for the clear example. I'm really not very familiar with this scanning code - @ritchie46 could you take a closer look?

I need to look at this more closely, but I think if the filters are properly pushed into the scan_ds. The partition is also filtered and file skipping is activated. At least this is described here: reading partitioned data

Regarding the scan_ds issue, I can also confirm that none of the predicated are pushed through when a string is used. In the example both filters are not included: df = ( _scan_ds(dataset, allow_pyarrow_filter=True) .select([pl.col("animal"), pl.col("year")]) .filter((pl.col("year") == 2020) & (pl.col("animal") == "Flamingo")) .collect() )

predicate = None with_columns = ['animal', 'year']

aersam commented 1 year ago

Something to consider: https://github.com/delta-io/delta-rs/issues/1128 ADBC Support would most likely supersed current implementation?

chitralverma commented 1 year ago

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

@stinodego your above explanation is correct, but wont this affect any thing that relies on scan_ds and not just scan_delta?

Let me know if you need some help regarding this, because I saw this as a potential bottleneck while adding the functionality. I considered using the deltalake lib only to get the paths of the parquet files based on table version and then call scan_parquet internally in scan_delta but that did not seem like the right way to go either.

dominikpeter commented 1 year ago

Any subsequent filters are pushed to scan_ds just fine. The problem is that these filters are not pushed further to step 2, where the conversion happens from Delta to pyarrow. This is required to take advantage of on-disk partitioning that Delta uses (if you're unfamiliar with Delta, it's just Parquet files with a layer on top).

@stinodego your above explanation is correct, but wont this affect any thing that relies on scan_ds and not just scan_delta?

Let me know if you need some help regarding this, because I saw this as a potential bottleneck while adding the functionality. I considered using the deltalake lib only to get the paths of the parquet files based on table version and then call scan_parquet internally in scan_delta but that did not seem like the right way to go either.

I think there are two problems with the scan_parquet approach. First, scan_parquet only allows glob pattern, so you would need to merge them somehow. Second, you would lose the benefits of the statistics. They are stored in the pyarrow dataset fragments. Therefore, file skipping would not be possible.

I believe if scan_dspasses all predicates to the underlying pyarrow datasets, the problem with the partitions are solved too.

@chitralverma by the way, I saw that delta-rs 0.7.0 was released, that would allow the DeltaStorageHandler to be pickable delta-rs issue 1016 -wouldn't solve this issue thought.

chitralverma commented 1 year ago

First, scan_parquet only allows glob pattern, so you would need to merge them somehow.

for lazy dfs, concat is a zero copy op, so that is easy.

Second, you would lose the benefits of the statistics

actually i confirmed it with some of the delta guys for another usecase on how this works, If i use the deltalake lib to just list the parquet files for a specified version, the delta log is looked up and the file skipping actually happens. the main question is this - does the polars parquet read use parquet statistics for optimizing IO ? @ritchie46 maybe you can clarify this one.

dominikpeter commented 1 year ago

I did some tests on my side on the dataset we are struggling with:

file_url = "data/delta/item"

@memory_usage
def scan():
    df = pl.scan_delta(file_url)
    print(df.filter(pl.col("item") == "00009501").fetch(5))

@memory_usage
def concat():
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()])
    print(dfs.filter(pl.col("item") == "00009501").fetch(5))

Function Name : scan Current memory usage: 0.178867MB Peak : 3.096506MB Timing Took : 11.0809 seconds

Function Name : concat Current memory usage: 0.001219MB Peak : 0.021559MB Timing Took : 0.0959 seconds

The parquet concat method is significantly faster in my example.

@chitralverma thanks a lot! We can already use this internally for our use case.

chitralverma commented 1 year ago

@dominikpeter try setting rechunk to False in pl.concat and see if it gets better

dominikpeter commented 1 year ago
times = []

for i in range(10):
    start = datetime.now()
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()], rechunk=False)
    dfs.filter((pl.col("item") == "01523048")).collect(streaming=True)
    end = datetime.now()
    delta = (end - start).total_seconds()
    times.append(delta)

print("rechunk=False")
print(np.array(times).mean())
print(np.array(times).std())

times = []

for i in range(10):
    start = datetime.now()
    dt = DeltaTable(file_url)
    dfs = pl.concat([pl.scan_parquet(i) for i in dt.file_uris()], rechunk=True)
    dfs.filter((pl.col("item") == "01523048")).collect(streaming=True)
    end = datetime.now()
    delta = (end - start).total_seconds()
    times.append(delta)

print("rechunk=True")
print(np.array(times).mean())
print(np.array(times).std())

rechunk=False 2.7260541 0.08526206672776585

rechunk=True 2.8425098 0.21936449264126584

@chitralverma it slightly improved the performance in my example

chitralverma commented 1 year ago

ok, i expected that. so this can technically work, here are my suggestions for further development of this connector in Polars.

What I am looking for is an argument against this approach. I don't think we are missing out on any capabilities but I am also not 100% sure.

BTW sorry for using your time to test these things but if you have a delta table in an object storage (any one) can you try the scan_delta vs scan_parquet test on that. my guess is that in that case the results are not going to be very different.

dominikpeter commented 1 year ago

I don't think there is a real downside. This approach is also described here: https://delta-io.github.io/delta-rs/python/usage.html for dask dataframes.

I think this could live inside rust and benefit from the things you mentioned. I would prefer this approach.

No problem. I have to thank here :-). I will test it tomorrow and give feedback.

dominikpeter commented 1 year ago

I was not able to make it work with an object store (azure storage account).

file_url = f"abfss://mycontainer@mystorage.dfs.core.windows.net/delta/item"
file_url = f"abfss://mycontainer/delta/item"

# tried both

storage_options = {
    "account_name": "mystorage",
    "account_key": "mykey",
}

@memory_usage
def concat():
    dt = DeltaTable(file_url, storage_options=storage_options)
    print(dt.file_uris())
    dfs = pl.concat([pl.scan_parquet(i, storage_options=storage_options) for i in dt.file_uris()])
    print(dfs.filter(pl.col("item") == "8043152").collect(streaming=True))

delta-rs gives me the correct parquet files with the correct uris back.

But polars results in an error: FileNotFoundError: No such file or directory: abfss://xxxxxxx

We discussed it a little bit internally. For the moment, we can push down the filters to the pyarrow dataset or use the concat approach. This will work for our use case.

I guess, it makes sense to wait for ADBC before putting too much effort in this. One downside of the concat parquet approach will be for sure for future releases of delta-rs, when they want to support column mapping, computed columns and so on.

Still believe it would make sense to fix the scan_ds filter pushdown bug, thought :-).

chitralverma commented 1 year ago

the file not found issue is because abfss scheme is not supported by fsspec

see supported schemes in the azure example here

https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.scan_delta.html#polars.scan_delta

but this can be changed now that the pickling issue is fixed on their side, however then in polars we will have to pin deltalake dependency to 0.7+

ADBC will require a conversion of polars plan to SQL for pushdown which does not exist at the moment in polars.

dominikpeter commented 1 year ago

I did some extensive test with the application we are running. I compared the scan_delta vs concat_parquet approach.

We are using for example offset this is way slower with the concat_parquet approach. However, filtering on string column of course is faster. But still believe there it could be faster when skipping parquet files that don't include record I am filtering for.

It was a really mixed result. I think for the moment I would keep it as it is. Maybe bump to 0.7 to take advantage of the new pickling.

What we end up doing probably is something like this:

    if filter:
        dt = DeltaTable(file_url)
        dataset = dt.to_pyarrow_dataset().scanner(filter=ds.field("item") == "8043152").to_table()
        df = pl.from_arrow(dataset)
    else:
        df = pl.scan_delta(file_url)
ritchie46 commented 1 year ago

The problem seems to be that we don't push the predicate down to _scan_ds. I will take a look.