pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
27.83k stars 1.7k forks source link

Query plan bug with partitioned iceberg tables #15196

Open KevinJiao opened 3 months ago

KevinJiao commented 3 months ago

Checks

Reproducible example

tbl = catalog.load_table(("default", "my_table"))
"""
my_table(
  1: id: optional string,
  2: file_date: optional date,
),
partition by: [file_date],
sort order: [],
snapshot: Operation.OVERWRITE: id=3445854501483894624, parent_id=5977889621156563414, schema_id=0
"""

df = pl.scan_iceberg(tbl)
df = df.filter(pl.col("file_date") == datetime.date.fromisoformat("2024-03-01")
print(df.explain()

Log output

FILTER [(col("file_date")) == (2024-03-01)] FROM

  PYTHON SCAN
  PROJECT */2 COLUMNS

Issue description

It appears that polars isn't able to do predicate pushdown for partitioned iceberg tables when the partition expression uses a date type, but the query is optimized when partition expression uses a string.

Expected behavior

df = pl.scan_iceberg(tbl).filter(pl.col("file_date")  == "2024-03-01")
print(df.explain())

gives the optimized plan

  PYTHON SCAN
  PROJECT */2 COLUMNS
  SELECTION: (pa.compute.field('file_date') == '2024-03-01')

This query plan results in not polars not loading every single parquet file in the table before running the filter expression.

Bug 2?

There is a second related bug where if the filter expression uses string types and is combined with a second expression, some forms will will end up not being pushed down and others will result in an error in the planner

df = pl.scan_iceberg(tbl).filter(
    (pl.col("file_date") > "2023-03-01")
    & (pl.col("id").is_in([str(x) for x in range(5)]))
)

gives


  PYTHON SCAN
  PROJECT */2 COLUMNS
  SELECTION: ((pa.compute.field('file_date') > '2023-03-01') & (pa.compute.field('id')).isin(["0","1","2","3","4"]))

While

df = pl.scan_iceberg(tbl).filter(
    (pl.col("file_date") > "2024-03-01")
    & (pl.col("id").is_in([str(x) for x in range(101)]))
)
print(df.explain())

throws

Traceback (most recent call last):
  File "/mnt/home/kevin.jiao/work/riskmodel_loader/bug_repro.py", line 75, in <module>
    print(df.explain())
  File "/mnt/home/kevin.jiao/.cache/pypoetry/virtualenvs/riskmodel-loader-U1GxshH--py3.10/lib/python3.10/site-packages/polars/lazyframe/frame.py", line 1212, in explain
    return ldf.describe_optimized_plan()
polars.exceptions.ComputeError: cannot compare 'date/datetime/time' to a string value (create native python { 'date', 'datetime', 'time' } or compare to a temporal column)

Installed versions

Full Repro file ``` import datetime import polars as pl from pyiceberg.schema import Schema from pyiceberg.partitioning import PartitionSpec from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.types import ( StringType, NestedField, DateType, ) catalog = SqlCatalog( "default", uri="sqlite:////tmp/iceberg/catalog.db", warehouse="file:///tmp/iceberg" ) catalog.create_namespace("default") schema = Schema( NestedField(field_id=1, name="id", field_type=StringType()), NestedField(field_id=2, name="file_date", field_type=DateType()), ) partition_spec = PartitionSpec(source_id=1, field_id=1000) catalog.create_table( identifier="default.my_table", schema=schema, location="/tmp/iceberg", partition_spec=partition_spec, ) tbl = catalog.load_table(("default", "my_table")) print(tbl.schema()) print("Plan for scan with string expression.") df = pl.scan_iceberg(tbl).filter(pl.col("file_date") == "2024-03-01") print(df.explain()) print("Plan for scan with date expression.") df = pl.scan_iceberg(tbl).filter( pl.col("file_date") == datetime.date.fromisoformat("2024-03-01") ) print(df.explain()) ```
``` --------Version info--------- Polars: 0.20.16 Index type: UInt32 Platform: Linux-6.5.0-1014-aws-x86_64-with-glibc2.35 Python: 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: 2023.12.2 gevent: hvplot: matplotlib: numpy: 1.26.4 openpyxl: pandas: 2.2.1 pyarrow: 15.0.1 pydantic: 2.6.3 pyiceberg: 0.6.0 pyxlsb: sqlalchemy: 2.0.28 xlsx2csv: xlsxwriter: ```
Fokko commented 1 month ago

@KevinJiao I think bug 1 has been fixed already:

>>> df = pl.scan_iceberg(tbl)
>>> import datetime
>>> 
>>> df.filter(pl.col("tpep_pickup_datetime") >= datetime.date.fromisoformat("2022-02-01")).collect()
(pa.compute.field('tpep_pickup_datetime') >= to_py_datetime(1643673600000000,'us'))
shape: (2_979_415, 20)
┌──────────┬─────────────────────┬─────────────────────┬─────────────────┬───┬──────────────┬────────────────────┬─────────────┬──────────────┐
│ VendorID ┆ tpep_pickup_datetim ┆ tpep_dropoff_dateti ┆ passenger_count ┆ … ┆ total_amount ┆ congestion_surchar ┆ airport_fee ┆ tip_per_mile │
│ ---      ┆ e                   ┆ me                  ┆ ---             ┆   ┆ ---          ┆ ge                 ┆ ---         ┆ ---          │
│ i64      ┆ ---                 ┆ ---                 ┆ f64             ┆   ┆ f64          ┆ ---                ┆ f64         ┆ f64          │
│          ┆ datetime[μs]        ┆ datetime[μs]        ┆                 ┆   ┆              ┆ f64                ┆             ┆              │
╞══════════╪═════════════════════╪═════════════════════╪═════════════════╪═══╪══════════════╪════════════════════╪═════════════╪══════════════╡
│ 1        ┆ 2022-02-01 00:06:58 ┆ 2022-02-01 00:19:24 ┆ 1.0             ┆ … ┆ 23.45        ┆ 0.0                ┆ 1.25        ┆ 0.722222     │
│ 1        ┆ 2022-02-01 00:38:22 ┆ 2022-02-01 00:55:55 ┆ 1.0             ┆ … ┆ 30.1         ┆ 0.0                ┆ 1.25        ┆ 0.0          │
│ 1        ┆ 2022-02-01 00:03:20 ┆ 2022-02-01 00:26:59 ┆ 1.0             ┆ … ┆ 44.6         ┆ 0.0                ┆ 1.25        ┆ 0.0          │
│ 2        ┆ 2022-02-01 00:08:00 ┆ 2022-02-01 00:28:05 ┆ 1.0             ┆ … ┆ 34.8         ┆ 2.5                ┆ 0.0         ┆ 0.0          │
│ 2        ┆ 2022-02-01 00:06:48 ┆ 2022-02-01 00:33:07 ┆ 1.0             ┆ … ┆ 48.66        ┆ 2.5                ┆ 1.25        ┆ 0.666941     │
│ …        ┆ …                   ┆ …                   ┆ …               ┆ … ┆ …            ┆ …                  ┆ …           ┆ …            │
│ 2        ┆ 2022-05-18 20:41:57 ┆ 2022-05-18 20:47:45 ┆ 4.0             ┆ … ┆ 11.8         ┆ 2.5                ┆ 0.0         ┆ 0.740741     │
│ 2        ┆ 2022-02-01 00:00:23 ┆ 2022-02-01 00:17:08 ┆ 1.0             ┆ … ┆ 40.36        ┆ 2.5                ┆ 1.25        ┆ 0.659148     │
│ 2        ┆ 2022-02-01 00:01:20 ┆ 2022-02-01 00:24:59 ┆ 1.0             ┆ … ┆ 37.8         ┆ 2.5                ┆ 0.0         ┆ 0.184672     │
│ 2        ┆ 2022-02-01 00:00:46 ┆ 2022-02-01 00:15:43 ┆ 5.0             ┆ … ┆ 21.36        ┆ 2.5                ┆ 0.0         ┆ 0.959569     │
│ 2        ┆ 2022-02-01 00:01:08 ┆ 2022-02-01 00:20:08 ┆ 5.0             ┆ … ┆ 26.8         ┆ 2.5                ┆ 0.0         ┆ 0.0          │
└──────────┴─────────────────────┴─────────────────────┴─────────────────┴───┴──────────────┴────────────────────┴─────────────┴──────────────┘
>>> df.filter(pl.col("tpep_pickup_datetime") >= datetime.date.fromisoformat("2022-02-01")).explain()
'\n  PYTHON SCAN \n  PROJECT */20 COLUMNS\n  SELECTION: [(col("tpep_pickup_datetime")) >= (2022-02-01 00:00:00)]'
tilowiklundSensmetry commented 1 month ago

I can confirm these issue(s) on my end. My table is partitioned by day(<timecol>).

Using <table>.filter(pl.col('<timecol>') >= date.fromisoformat('2024-05-15')) I get

PYTHON SCAN 
PROJECT */93 COLUMNS
SELECTION: [(col("<timecol>")) >= (2024-05-15 00:00:00)]

while using <table>.filter(pl.col('<timecol>') >= datetime.fromisoformat('2024-05-15T00:00:00.000+00:00')) gives

FILTER [(col("<timecol>")) >= (2024-05-15 00:00:00.dt.replace_time_zone([String(earliest)]))] FROM

    PYTHON SCAN 
    PROJECT */93 COLUMNS

Trying to compare to the string '2024-05-15' just yields

cannot compare 'date/datetime/time' to a string value (create native python { 'date', 'datetime', 'time' } or compare to a temporal column)

Using polars==0.20.30

tilowiklundSensmetry commented 1 month ago

The problem seems somehow related to the following lines

if time_zone is not None:
    expr = expr.dt.replace_time_zone(
        time_zone, ambiguous="earliest" if value.fold == 0 else "latest"
    )

in https://github.com/pola-rs/polars/blob/py-0.20.30/py-polars/polars/functions/lit.py#L96-L99

If I comment these out I get a plan

PYTHON SCAN 
PROJECT */93 COLUMNS
SELECTION: [(col("<timecol>")) >= (2024-05-15 00:00:00)]

The expression itself then changes from

[(col("<timecol>")) >= (dyn int: 1715731200000000.strict_cast(Datetime(Microseconds, None)).dt.replace_time_zone([String(earliest)]))]

to

[(col("<timecol>")) >= (dyn int: 1715731200000000.strict_cast(Datetime(Microseconds, None)))]
tilowiklundSensmetry commented 1 month ago

I think I've traced down where the predicate push down logic fails: https://github.com/pola-rs/polars/blob/5974ac7afa91d7ab08237b99979b39bdf6c9da2a/crates/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs#L678-L681

For the plan that gets successfully pushed down predicate_to_pa produces (Some):

"(pa.compute.field('<tscol>') >= to_py_datetime(1715731200000000,'us',UTC))"

while for the other one it fails.

tilowiklundSensmetry commented 1 month ago

It hits the catch all failure branch at https://github.com/pola-rs/polars/blob/main/crates/polars-plan/src/logical_plan/pyarrow.rs#L185 on the term

Function {
    input: [
        ExprIR {
            output_name: LiteralLhs(
                "literal",
            ),
            node: Node(
                2,
            ),
        },
        ExprIR {
            output_name: LiteralLhs(
                "literal",
            ),
            node: Node(
                3,
            ),
        },
    ],
    function: TemporalExpr(
        ReplaceTimeZone(
            Some(
                "UTC",
            ),
            Raise,
        ),
    ),
    options: FunctionOptions {
        collect_groups: ElementWise,
        fmt_str: "",
        input_wildcard_expansion: false,
        returns_scalar: false,
        cast_to_supertypes: false,
        allow_rename: false,
        pass_name_to_apply: false,
        changes_length: false,
        check_lengths: UnsafeBool(
            true,
        ),
        allow_group_aware: true,
    },
}
tilowiklundSensmetry commented 1 month ago

Attaching a small PoC hack with which I manage to correctly run my a query. That being said, correctly implementing the logic seems more difficult than just trying to just avoid introducing the time zone coercion to begin with.

poc.patch.txt (from tag py-0.20.30)

tilowiklundSensmetry commented 1 month ago

The time zone conversion logic (mentioned above) for Datetime literals seems a bit strange: https://github.com/pola-rs/polars/blob/py-0.20.30/py-polars/polars/functions/lit.py#L93-L100

If I understand it correctly it:

Any datetime literal with a time zone therefore ends up a symbolic function application expression of the form replace_time_zone(zone_unware_datetime_literal, original_time_zone) instead of a plain datetime literal. This breaks the predicate pushdown logic because it does not know how to translate the symbolic function application, instead turning it into a filter.

The purpose of these time zone gymnastics are unclear to me. My guess is that it intends to achieve something the equivalent of

if time_zone is not None:
    time_zone = disambiguate_time_zone(time_zone, ambiguous="earliest" if value.fold == 0 else "latest")
return lit(dt_int).cast(Datetime(time_unit, time_zone = time_zone))

where disambiguate_time_zone would implement the same time zone disambiguation logic as pl.Series.dt.replace_time_zone.

tilowiklundSensmetry commented 1 week ago

I might be able to write a proper patch, but I'd need some input from a maintainer about which (if any) of the strategies outlined above to pursue (try to avoid the conversion to begin with or implement the necessary expression parsing and conversion).