apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.48k stars 2.24k forks source link

Spark: MERGE INTO does not push down the predicate of the WHEN NOT MATCHED BY SOURCE … THEN DELETE clause #11248

Open Visorgood opened 1 month ago

Visorgood commented 1 month ago

Apache Iceberg version

1.6.0

Query engine

Spark

Please describe the bug 🐞

MERGE INTO command is doing a full Iceberg table scan, even though the table is well partitioned. The idea of running this command is an idempotent ingestion/reingestion of a "4 hour block" of new data into a (hidden) daily partition.

I’m using Spark 3.5.2 and Iceberg 1.6.0.

Table schema is as such:

CREATE TABLE iceberg_table (
  part_col_1 STRING,
  part_col_2 STRING,
  part_col_3 TIMESTAMP,
  some_col_x INT,
  some_col_y STRING,
  main_value_col BIGINT
)
USING iceberg
PARTITIONED BY (part_col_1, part_col_2, days(part_col_3))

MERGE INTO command I’m running is as follows:

MERGE INTO iceberg_table t
 USING new_data_to_upsert s
 ON
 t.part_col_1 = 'some_part_value_a' AND
 t.part_col_2 = 'some_part_value_b' AND
 t.part_col_3 >= to_timestamp('2024-09-25 00:00:00') AND
 t.part_col_3 < to_timestamp('2024-09-25 04:00:00') AND
 t.part_col_3 = s.part_col_3 AND
 t.some_col_x = s.some_col_x AND
 t.some_col_y = s.some_col_y
 WHEN MATCHED THEN UPDATE SET t.main_value_col = s.main_value_col
 WHEN NOT MATCHED BY TARGET THEN INSERT *
 WHEN NOT MATCHED BY SOURCE AND
 t.part_col_1 = 'some_part_value_a' AND
 t.part_col_2 = 'some_part_value_b' AND
 t.part_col_3 >= to_timestamp('2024-09-25 00:00:00') AND
 t.part_col_3 < to_timestamp('2024-09-25 04:00:00')
 THEN DELETE

View new_data_to_upsert has the same columns as the table, and all its records have

I also .persist() it before doing .createOrReplaceTempView("new_data_to_upsert").

Query plan shows for the iceberg_table scan the following:

(1) BatchScan iceberg_table
Output [8]: [some_col_x#511, part_col_3#512, part_col_1#513, part_col_2#514, some_col_y#515, main_value_col#516L, _file#520]
iceberg_table (branch=null) [filters=true, groupedBy=]

As far as I understand filters=true is the problem - no predicate push-down. When I try a regular SELECT or DELETE, it shows smth like:

filters=[part_col_1 = 'some_part_value_a', part_col_2 = 'some_part_value_b', part_col_3 >= 1727222400000, part_col_3 < 1727236800000]

If I remove WHEN NOT MATCHED BY SOURCE … THEN DELETE clause from the query, the plan is different and the BatchScan steps to the Iceberg table do have filters specified. No matter what I've tried with the predicate in this clause, it never resulted in the predicate push-down.

Maybe this is related to this issue: https://github.com/apache/iceberg/issues/10108

Willingness to contribute

Visorgood commented 1 month ago

I was investigating this more, and tried an approach suggested on this StackOverflow page. In a nutshell, it suggests to use a CTE or a temp view for the target table, to limit the deletions to a needed scope. When I tried this:

WITH t AS (
     SELECT *
     FROM iceberg_table
     WHERE part_col_1 = 'some_part_value_a' AND part_col_2 = 'some_part_value_b' AND part_col_3 >= '2024-09-25 00:00:00' AND part_col_3 < '2024-09-25 04:00:00'
)
MERGE INTO t USING new_data_to_upsert s ON ... {the rest of the query is the same}

I received an error:

Error occurred during query planning:
MERGE INTO TABLE is not supported temporarily.
huaxingao commented 1 month ago

cc @aokolnychyi

Biman54 commented 1 week ago

Any update?