pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
26.9k stars 1.65k forks source link

Performance degradation in scan_parquet() vs. scan_pyarrow_dataset() #13908

Closed lmocsi closed 3 months ago

lmocsi commented 4 months ago

Checks

Reproducible example

!pip install --upgrade polars==0.20.3
!pip install --upgrade pyarrow==15.0.0

import polars as pl
import pyarrow.dataset as ds
tr = pl.scan_pyarrow_dataset(ds.dataset(parq_path+"my_table", partitioning='hive')) # Wall time: 6.14 s
#tr = pl.scan_parquet(parq_path+"my_table/*/*.parquet") # Wall time: 1min 59s

df = (tr.filter((pl.col('CALENDAR_DATE').is_between(pl.lit('2023-07-21'), pl.lit('2024-01-22'))) &
                     (pl.col('CRE_FL') == 'I') &
                     (pl.col('SEC_ID') > 3) &
                     (pl.col('LE_ID') == 2905) &
                     (~pl.col('PARTY_ID').is_in([5086634, 2149316, 6031676])) &
                     (pl.col('TR_ID') != 7010)
                     )
             .select('PARTY_ID').unique()
             .rename({'PARTY_ID':'PART_ID'})
             .with_columns(pl.lit(1).alias('NEXT_FL'))
             .collect()
         )

Log output

No log.

Issue description

The above code with scan_pyarrow_dataset() runs in about 6 second. The same code with scan_parquet() runs in 1 min 52 secs!!! But both of them does this in less than 1.5 GB of RAM.

If I upgrade polars to the current 0.20.5 version, then the scan_pyarrow_dataset() way runs out of 32 GB memory, as if it was not able to filter on partition column of CALENDAR_DATE. The scan_parquet() version runs for roughly the same time as it does on 0.20.3 polars.

Expected behavior

Scan_parquet() should complete in about the same time as scan_pyarrow_dateset() does. Scan_pyarrow_dataset() should be able to filter on partition column on 0.20.5 polars, as well.

Installed versions

``` --------Version info--------- Polars: 0.20.3 Index type: UInt32 Platform: Linux-4.18.0-372.71.1.el8_6.x86_64-x86_64-with-glibc2.28 Python: 3.9.13 (main, Oct 13 2022, 21:15:33) [GCC 11.2.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 2.0.0 connectorx: deltalake: fsspec: 2022.02.0 gevent: hvplot: matplotlib: 3.8.0 numpy: 1.23.5 openpyxl: 3.0.9 pandas: 2.1.0 pyarrow: 15.0.0 pydantic: pyiceberg: pyxlsb: sqlalchemy: 1.4.27 xlsx2csv: xlsxwriter: 3.1.3 ```
ritchie46 commented 4 months ago

Can you make a reproducable example? There is nothing here that we can test.

lmocsi commented 4 months ago

I guess I cannot create / upload several GB-s of partitioned parquet file here...

deanm0000 commented 4 months ago

Run the examples with the trace log on please. Which column(s) are part of the hive partition?

lmocsi commented 4 months ago

Partitioned only on calendar_date. How do I turn on trace logging?

ritchie46 commented 4 months ago

I guess I cannot create / upload several GB-s of partitioned parquet file here...

You can create dummy data in your script. And you likely don't need several GB to you show the effect. Please make an effort to produce something we can reproduce if you want it fixed.

deanm0000 commented 4 months ago

before your import polars line, run these, then do everything else

import os
os.environ['POLARS_VERBOSE']='1'
os.environ["RUST_BACKTRACE"]="1"

You probably don't need the rust one here but just for reference.

lmocsi commented 4 months ago

With logging turned on, I cannot see more details:

Traceback (most recent call last): File "/tmp/1000920000/ipykernel_1125/610731593.py", line 2, in df = (tr.filter((pl.col('CALENDAR_DATE').is_between(pl.lit('2023-07-21'), pl.lit('2024-01-22')) & File "/opt/conda/envs/Python-3.9-Premium/lib/python3.9/site-packages/polars/lazyframe/frame.py", line 1730, in collect return wrap_df(ldf.collect()) polars.exceptions.ComputeError: ArrowMemoryError: malloc of size 2490368 failed

deanm0000 commented 4 months ago

Are you saying that before it was just slow but now it's generating an error?

lmocsi commented 4 months ago

Scan_parquet() was slow in 0.20.3 and is slow in 0.20.4/0.20.5 Scan_pyarrow_dataset() was fast in 0.20.3 and is running out of memory in 0.20.4/0.20.5 (as if unable to filter on hive-partitioned columns)

lmocsi commented 4 months ago

Weird thing is that with synthetic data it is vica versa:

ion-elgreco commented 4 months ago

@lmocsi can you share the plan with .explain() with older and new Polars version?

lmocsi commented 4 months ago

With a synthetic dataset (created for 100000 customers, see here: https://github.com/apache/arrow/issues/39768): Polars==0.20.6 + scan_pyarrow_dataset:

Explain:
 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]
        FILTER [(col("CALENDAR_DATE").is_between([String(2023-07-21), String(2024-01-22)])) & ([(col("CREDIT_FL")) == (String(Y))])] FROM
          PYTHON SCAN 
          PROJECT 3/4 COLUMNS

df.shape: (100_000, 2) Max memory occupation: ~13 GB Final memory occupation: 9.7 GB


Polars==0.20.6 + scan_parquet:

Explain:
 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          Parquet SCAN 186 files: first file: my_table\\CALENDAR_DATE=2023-07-21%2000%3A00%3A00\\part-0.parquet
          PROJECT 3/4 COLUMNS
          SELECTION: [(col("CALENDAR_DATE").is_between([String(2023-07-21), String(2024-01-22)])) & ([(col("CREDIT_FL")) == (String(Y))])]

df.shape: (100_000, 2) Max memory occupation: ~4.9 GB Final memory occupation: 1.7 GB


Polars==0.20.3 + scan_pyarrow_dataset:

Explain:
 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          PYTHON SCAN 
          PROJECT 3/4 COLUMNS
          SELECTION: (((pa.compute.field(\'CALENDAR_DATE\') >= \'2023-07-21\') & (pa.compute.field(\'CALENDAR_DATE\') <= \'2024-01-22\')) & (pa.compute.field(\'CREDIT_FL\') == \'Y\'))

df.shape: (100_000, 2) Max memory occupation: ~10.1 GB Final memory occupation: 7 GB


Polars==0.20.3 + scan_parquet:

Explain:
 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          Parquet SCAN 185 files: first file: my_table\\CALENDAR_DATE=2023-07-21%2000%3A00%3A00\\part-0.parquet
          PROJECT 3/4 COLUMNS
          SELECTION: [([([(col("CALENDAR_DATE")) >= (String(2023-07-21))]) & ([(col("CALENDAR_DATE")) <= (String(2024-01-22))])]) & ([(col("CREDIT_FL")) == (String(Y))])]

df.shape: (100_000, 2) Max memory occupation: ~6.4 GB Final memory occupation: 6.4 GB

lmocsi commented 4 months ago

With the real dataset: Polars==0.20.6 + scan_pyarrow_dataset:

 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]
        FILTER col("CALENDAR_DATE").is_between([String(2023-07-21), String(2024-01-22)]) FROM

          PYTHON SCAN 
          PROJECT 2/27 COLUMNS

df.shape: - Max memory occupation: <runs out of 32 GB> Final memory occupation: <runs out of 32 GB> Wall time: -


Polars==0.20.6 + scan_parquet:

 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          Parquet SCAN 1846 files: first file: /mypath/mytable/CALENDAR_DATE=2019-01-01 00%3A00%3A00/part-00000-35c18ead-ef01-4535-a3df-f9e09af6c12b.c000.snappy.parquet
          PROJECT 2/27 COLUMNS
          SELECTION: col("CALENDAR_DATE").is_between([String(2023-07-21), String(2024-01-22)])

df.shape: (1_075_661, 2) Max memory occupation: 5.1 GB Final memory occupation: 2.2 GB Wall time: 53 s


Polars==0.20.3 + scan_pyarrow_dataset:

 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          PYTHON SCAN 
          PROJECT 2/27 COLUMNS
          SELECTION: ((pa.compute.field(\'CALENDAR_DATE\') >= \'2023-07-21\') & (pa.compute.field(\'CALENDAR_DATE\') <= \'2024-01-22\'))

df.shape: (1_075_661, 2) Max memory occupation: 13.1 GB Final memory occupation: 2.4 GB Wall time: 14.1 s


Polars==0.20.3 + scan_parquet:

<no plan here since df.explain() is running for more than 1h 43 minutes!!!>

df.shape: (1_075_661, 2) Max memory occupation: 11.9 GB Final memory occupation: 2.2 GB Wall time: 14.9 s

Lots of warnings here: First lots of lines like this: "parquet file can be skipped, the statistics were sufficient to apply the predicate."

Then lots of lines like this: "parquet file must be read, statistics not sufficient for predicate."

Then mixed lines like: "parquet file must be read, statistics not sufficient for predicate. parquet file can be skipped, the statistics were sufficient to apply the predicate."

And finally: "hive partitioning: skipped 1661 files, first file : /mypath/mytable/CALENDAR_DATE=2019-01-01 00%3A00%3A00/part-00000-35c18ead-ef01-4535-a3df-f9e09af6c12b.c000.snappy.parquet"

ion-elgreco commented 4 months ago

@lmocsi it looks like the filter in scan_pyarrow_dataset is not pushed down to pyarrow, the selection should show the pyarrow.compute expressions in v0.20.6 but it doesn't.

@ritchie46 seems like there is no more filter pushdown to pyarrow dataset and the filter only happens in polars after everything is loaded into memory

deanm0000 commented 4 months ago

Is your calendar date stored as a string and not date?

lmocsi commented 4 months ago

Calendar date is returned as a string column, since that is the partition column. Files look like this in this partitioned parquet file/table: /mypath/mytable/CALENDAR_DATE=2019-01-01 00%3A00%3A00/part-00-35c18ead-ef01.c000.snappy.parquet

deanm0000 commented 4 months ago

if you do scan_parquet on just the one file, does it give you a column with

2019-01-01 00%3A00%3A00 or 2019-01-01 00:00:00

Try doing something like:

tr = pl.scan_parquet(parq_path+"my_table/*/*.parquet")

df = (
    pl.select(
        CALENDAR_DATE=pl.date_range(pl.date(2023,7,21),pl.date(2024,1,11),'1d').dt.strftime("%Y-%m-%d") + "%2000%3A00%3A00"
        ).lazy()
    .join(tr, on='CALENDAR_DATE')
    .filter(         (pl.col('CRE_FL') == 'I') &
                     (pl.col('SEC_ID') > 3) &
                     (pl.col('LE_ID') == 2905) &
                     (~pl.col('PARTY_ID').is_in([5086634, 2149316, 6031676])) &
                     (pl.col('TR_ID') != 7010)
                     )
             .select('PARTY_ID').unique()
             .rename({'PARTY_ID':'PART_ID'})
             .with_columns(pl.lit(1).alias('NEXT_FL'))
             .collect()
)
lmocsi commented 4 months ago

<polars==0.20.6>

This:

tr = pl.scan_parquet(parq_path+"my_table/*/*.parquet")
tr.limit(1).select(pl.col('CALENDAR_DATE')).collect().rows()

Gives me: [('2019-01-01 00:00:00',)]

But the actual directory looks like this: CALENDAR_DATE=2019-01-01 00%3A00%3A00 So there must be some conversion somewhere in between...

The above join statement leads to kernel-die somewhere above 17 GB of ram usage (in a 32 GB environment).

deanm0000 commented 4 months ago

What about...

df = (
    tr
    .filter(         (pl.col('CALENDAR_DATE').is_in(
    pl.select(
        CALENDAR_DATE=pl.date_range(pl.date(2023,7,21),pl.date(2024,1,11),'1d').dt.strftime("%Y-%m-%d") + " 00:00:00"
        ).to_series().to_list()
) & (pl.col('CRE_FL') == 'I') &
                     (pl.col('SEC_ID') > 3) &
                     (pl.col('LE_ID') == 2905) &
                     (~pl.col('PARTY_ID').is_in([5086634, 2149316, 6031676])) &
                     (pl.col('TR_ID') != 7010)
                     )
             .select('PARTY_ID').unique()
             .rename({'PARTY_ID':'PART_ID'})
             .with_columns(pl.lit(1).alias('NEXT_FL'))
             .collect()
)

Ultimately, I think there's a good amount of potential improvement to do in the polars hive partitioning optimization workhorse. While the current state isn't as robust as the pyarrow's hive dataset scanner, I think the effort is going to be in making those aforementioned improvements rather than strengthening those pyarrow linkages.

As such, assuming the last snippet doesn't work, I think you probably want to just do more of the filtering directly in pyarrow.

So something like

df = (
    pl.scan_pyarrow_dataset(
        ds.dataset(parq_path+"my_table", partitioning='hive')
        .filter(
            (ds.field('CALENDAR_DATE')>='2023-07-21') &
            (ds.field('CALENDAR_DATE')<='2024-01-22') &
            (ds.field('CRE_FL') == 'I') &
            (ds.field('SEC_ID') > 3) &
            (ds.field('LE_ID') == 2905) &
            (~ds.field('PARTY_ID').isin([5086634, 2149316, 6031676])) &
            (ds.field('TR_ID') != 7010)
        )
    )
    .select('PARTY_ID').unique()
    .rename({'PARTY_ID':'PART_ID'})
    .with_columns(pl.lit(1).alias('NEXT_FL'))
    .collect()
    )
lmocsi commented 4 months ago

The .to_series().to_list() version runs fast using scan_parquet():

<polars==0.20.6>

 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          Parquet SCAN 175 files: first file: /mypath/myfile/CALENDAR_DATE=2023-07-21 00%3A00%3A00/part-00000-015dea05.c000.snappy.parquet
          PROJECT 3/27 COLUMNS
          SELECTION: [(col("CALENDAR_DATE").is_in([Series])) & ([(col("CRE_FL")) == (String(I))])]

Max memory occupation: 2.7 GB Final memory occupation: 2.1 GB Wall time: 15.1 s

But it is cumbersome to filter like that... :(

deanm0000 commented 4 months ago

yeah it only seems to know how to skip files with exact matches otherwise it wants to read the file.

It also only works if all the files are 00:00:00, if you've got some 00:12:00, etc then it'd miss those entirely.

Another thing you can do, which isn't any less code but it is more resilient:

tr = pl.scan_parquet(parq_path+"my_table/*/*.parquet")
import json
df=(
    pl.scan_parquet(
        pl.select(paths=json.loads(tr.serialize())['Scan']['paths'])
        .explode('paths')
        .with_columns(
            CALENDAR_DATE=pl.col('paths').str.extract("CALENDAR_DATE=(.+?)/")
            .str.replace_all("%3A",":")
            .str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S"),
            )
        .filter(pl.col('CALENDAR_DATE').is_between(
            pl.datetime(2023,7,21), pl.datetime(2024,1,22)
            ))
        .get_column('paths')
        .to_list()
    )
    .filter(
        (pl.col('CRE_FL') == 'I') &
        (pl.col('SEC_ID') > 3) &
        (pl.col('LE_ID') == 2905) &
        (~pl.col('PARTY_ID').is_in([5086634, 2149316, 6031676])) &
        (pl.col('TR_ID') != 7010)
        )
    .select('PARTY_ID').unique()
    .rename({'PARTY_ID':'PART_ID'})
    .with_columns(pl.lit(1).alias('NEXT_FL'))
    .collect()
    )

In this way you do a first scan just to get the file list then you use have to use serialize and json.loads to extract the file list and use regex to extract your CALENDAR_DATE and strptime it to Datetime which you can then filter. That gives you the file list that satisfies your date filter and then do another scan which only includes those files.

ion-elgreco commented 4 months ago

@deanm0000 these are all workarounds though. Polars should be able to parse those characters properly

deanm0000 commented 4 months ago

@deanm0000 these are all workarounds though. Polars should be able to parse those characters properly

agreed, I'm not saying otherwise.

deanm0000 commented 4 months ago

I think what it needs would go here... https://github.com/pola-rs/polars/blob/608d6ac349411aea1a2ca785483aa4a288873ad2/crates/polars-lazy/src/physical_plan/expressions/apply.rs#L489-L528

We need another match on FunctionExpr::Boolean(BooleanFunction::IsBetween)

deanm0000 commented 4 months ago

@ritchie46 see my previous comment. I think that's the fix but I don't know how to implement it.