Eventual-Inc / Daft

Distributed data engine for Python/SQL designed for the cloud, powered by Rust
https://getdaft.io
Apache License 2.0
2.3k stars 158 forks source link

Partition filter is not applied for bucketized partition column. #3183

Open jaehyeon-kim opened 5 hours ago

jaehyeon-kim commented 5 hours ago

Describe the bug

I have an iceberg table that is partitioned by three columns. The first column (id) is bucketized.

spark.sql(f"""
  CREATE TABLE {CATALOG_NAME}.demo.sample1
  USING iceberg
  PARTITIONED BY (bucket(16, id), days(ts), category)
  AS SELECT id, data, category, to_timestamp(ts) AS ts
    FROM {CATALOG_NAME}.demo.staging""")

When I filter by the partition column (id), I see a warning that indicates no partition filter is specified. If the column is not bucketized, I don't see such a warning.

iceberg_table1 = catalog.load_table("demo.sample1")
df1 = daft.read_iceberg(iceberg_table1).where(col("id") <= 3)
df1.explain(show_all=True)
== Unoptimized Logical Plan ==

* Filter: col(id) <= lit(3)
|
* PythonScanOperator: IcebergScanOperator(more.demo.sample1)
|   File schema = id#Int64, data#Utf8, category#Utf8, ts#Timestamp(Microseconds, Some("UTC"))
|   Partitioning keys = [PartitionField(id_bucket#Int32, src=id#Int64, tfm=IcebergBucket(16)) PartitionField(ts_day#Int32, src=ts#Timestamp(Microseconds, Some("UTC")), tfm=Day)
|     PartitionField(category#Utf8, src=category#Utf8, tfm=Identity)]
|   Output schema = id#Int64, data#Utf8, category#Utf8, ts#Timestamp(Microseconds, Some("UTC"))

== Optimized Logical Plan ==

* PythonScanOperator: IcebergScanOperator(more.demo.sample1)
|   File schema = id#Int64, data#Utf8, category#Utf8, ts#Timestamp(Microseconds, Some("UTC"))
|   Partitioning keys = [PartitionField(id_bucket#Int32, src=id#Int64, tfm=IcebergBucket(16)) PartitionField(ts_day#Int32, src=ts#Timestamp(Microseconds, Some("UTC")), tfm=Day)
|     PartitionField(category#Utf8, src=category#Utf8, tfm=Identity)]
|   Filter pushdown = col(id) <= lit(3)
|   Output schema = id#Int64, data#Utf8, category#Utf8, ts#Timestamp(Microseconds, Some("UTC"))

== Physical Plan ==

WARNING:root:IcebergScanOperator(more.demo.sample1) has Partitioning Keys: [PartitionField(id_bucket#Int32, src=id#Int64, tfm=IcebergBucket(16)), PartitionField(ts_day#Int32, src=ts#Timestamp(Microseconds, Some("UTC")), tfm=Day), PartitionField(category#Utf8, src=category#Utf8, tfm=Identity)] but no partition filter was specified. This will result in a full table scan.
* TabularScan:
|   Num Scan Tasks = 7
|   Estimated Scan Bytes = 7983
|   Clustering spec = { Num partitions = 7 }
|   Pushdowns: {filter: col(id) <= lit(3)}
|   Schema: {id#Int64, data#Utf8, category#Utf8, ts#Timestamp(Microseconds, Some("UTC"))}
|   Scan Tasks: [
|   {File {/home/jaehyeon/projects/general-demos/daft-quickstart/more_warehouse/demo/sample1/data/id_bucket=3/ts_day=2023-12-31/category=C1/00000-9-eda2c1e9-e068-4907-b777-
|     bbb6cb6b2d55-0-00005.parquet}}
|   {File {/home/jaehyeon/projects/general-demos/daft-quickstart/more_warehouse/demo/sample1/data/id_bucket=4/ts_day=2023-12-31/category=C1/00000-9-eda2c1e9-e068-4907-b777-
|     bbb6cb6b2d55-0-00001.parquet}}
|   {File {/home/jaehyeon/projects/general-demos/daft-quickstart/more_warehouse/demo/sample1/data/id_bucket=7/ts_day=2023-12-31/category=C2/00000-9-eda2c1e9-e068-4907-b777-
|     bbb6cb6b2d55-0-00002.parquet}}
|   ...
|   {File {/home/jaehyeon/projects/general-demos/daft-quickstart/more_warehouse/demo/sample1/data/id_bucket=4/ts_day=2024-01-01/category=C2/00000-9-eda2c1e9-e068-4907-b777-
|     bbb6cb6b2d55-0-00006.parquet}}
|   {File {/home/jaehyeon/projects/general-demos/daft-quickstart/more_warehouse/demo/sample1/data/id_bucket=4/ts_day=2024-01-01/category=C3/00000-9-eda2c1e9-e068-4907-b777-
|     bbb6cb6b2d55-0-00007.parquet}}
|   {File {/home/jaehyeon/projects/general-demos/daft-quickstart/more_warehouse/demo/sample1/data/id_bucket=15/ts_day=2024-01-02/category=C1/00000-9-eda2c1e9-e068-4907-b777-
|     bbb6cb6b2d55-0-00004.parquet}}
|   ]

To Reproduce

The iceberg table is created as follows.

spark.sql(f"""
    CREATE TABLE {CATALOG_NAME}.demo.staging (
        id bigint,
        data string,
        category string,
        ts string)
    USING iceberg;
  """)

spark.sql(f"""
  INSERT INTO {CATALOG_NAME}.demo.staging VALUES
    (1, 'A', 'C1', '2024-01-01'),
    (3, 'B', 'C2', '2024-01-02'),
    (8, 'C', 'C1', '2024-01-03'),
    (9, 'D', 'C2', '2024-01-01'),
    (2, 'E', 'C3', '2024-01-02'),
    (7, 'F', 'C1', '2024-01-01'),
    (12, 'G', 'C2', '2024-01-02')
    """)

spark.sql(f"""
  CREATE TABLE {CATALOG_NAME}.demo.sample1
  USING iceberg
  PARTITIONED BY (bucket(16, id), days(ts), category)
  AS SELECT id, data, category, to_timestamp(ts) AS ts
    FROM {CATALOG_NAME}.demo.staging""")

Expected behavior

The filter predicate should be passed into partition filters.

Component(s)

Other

Additional context

No response

samster25 commented 4 hours ago

Hello @jaehyeon-kim,

I believe that the bucket transform uses the hash of the id to bucket the data. So the only predicate we can push down in this case is equality. The issue is that id <= 3 can't be pushed down due to the less than component.

Can you try the following query with an equal instead?

iceberg_table1 = catalog.load_table("demo.sample1")
df1 = daft.read_iceberg(iceberg_table1).where(col("id") == 3)
df1.explain(show_all=True)