apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.36k stars 2.2k forks source link

Spark's optimizer miss-merges Iceberg's two different refs because their canonicalized forms are same. #9450

Open wooyeong opened 9 months ago

wooyeong commented 9 months ago

Apache Iceberg version

1.4.2 (latest release)

Query engine

Spark

Please describe the bug 🐞

I was trying to make a history table using the time travel feature. For example, I could only insert only one row for the date 2024-01-02 thanks to time travel.

user date
a 2024-01-01
b 2024-01-01
a 2024-01-02
b 2024-01-02
c 2024-01-02

However, I encountered a weird bug that blocked me from making a complete set of historical data. I'll paste reproducible queries and the possible cause below.

Queries

-- Create table
CREATE TABLE iceberg_except_test (
    id string,
    a string,
    b timestamp
)
USING ICEBERG
PARTITIONED BY (DATE(b));

-- Insert a row: (1, a, 2023-01-01)
MERGE INTO iceberg_except_test AS t
USING
  (SELECT '1' AS id, 'a' AS a, date '2023-01-01' AS b) AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.a = s.a, t.b = s.b
WHEN NOT MATCHED THEN INSERT *;

-- Set tag `2023-01-01` -> [(1, a, 2023-01-01)]
ALTER TABLE iceberg_except_test CREATE OR REPLACE TAG `2023-01-01`;

-- Update the row: (1, b, 2024-01-01)
MERGE INTO iceberg_except_test AS t
USING
  (SELECT '1' AS id, 'b' AS a, date '2024-01-01' AS b) AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE set t.a = s.a, t.b = s.b
WHEN NOT MATCHED THEN INSERT *;

-- Set tag `2024-01-01` -> [(1, b, 2024-01-01)]
ALTER TABLE iceberg_except_test CREATE OR REPLACE TAG `2024-01-01`;

-- Fetch two tags altogether -> [(1, a, 2023-01-01), (1, b, 2024-01-01)]
SELECT * FROM iceberg_except_test
VERSION AS OF '2023-01-01'

UNION ALL 

SELECT * FROM iceberg_except_test
VERSION AS OF '2024-01-01';

-- Fetch distinct rows from two tags -> [(1, a, 2023-01-01)] * It should return two rows.
SELECT * FROM iceberg_except_test
VERSION AS OF '2023-01-01'

UNION

SELECT * FROM iceberg_except_test
VERSION AS OF '2024-01-01';

Query plan

> == Parsed Logical Plan ==
> 'Distinct
> +- 'Union false, false
>    :- 'Project [*]
>    :  +- 'RelationTimeTravel 'UnresolvedRelation [iceberg_except_test], [], false, 2023-01-01
>    +- 'Project [*]
>       +- 'RelationTimeTravel 'UnresolvedRelation [iceberg_except_test], [], false, 2024-01-01
> 
> == Analyzed Logical Plan ==
> id: string, a: string, b: timestamp
> Distinct
> +- Union false, false
>    :- Project [id#189, a#190, b#191]
>    :  +- SubqueryAlias local.iceberg_except_test
>    :     +- RelationV2[id#189, a#190, b#191] local.iceberg_except_test local.iceberg_except_test
>    +- Project [id#192, a#193, b#194]
>       +- SubqueryAlias local.iceberg_except_test
>          +- RelationV2[id#192, a#193, b#194] local.iceberg_except_test local.iceberg_except_test
> 
> == Optimized Logical Plan ==
> Aggregate [id#189, a#190, b#191], [id#189, a#190, b#191]
> +- RelationV2[id#189, a#190, b#191] local.iceberg_except_test
> 
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[id#189, a#190, b#191], functions=[], output=[id#189, a#190, b#191])
>    +- Exchange hashpartitioning(id#189, a#190, b#191, 200), ENSURE_REQUIREMENTS, [plan_id=574]
>       +- HashAggregate(keys=[id#189, a#190, b#191], functions=[], output=[id#189, a#190, b#191])
>          +- BatchScan local.iceberg_except_test[id#189, a#190, b#191] local.iceberg_except_test (branch=null) [filters=, groupedBy=] RuntimeFilters: []

Cause

--conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion
-- Fetch distinct rows from two tags -> [(1, a, 2023-01-01), (1, b, 2024-01-01)]
SELECT * FROM iceberg_except_test
VERSION AS OF '2023-01-01'

UNION

SELECT * FROM iceberg_except_test
VERSION AS OF '2024-01-01';

Possible fix

You could save me huge storage costs if I can utilize this feature. I look forward to your advice. Thanks in advance.

ajantha-bhat commented 9 months ago

@wooyeong: Nice catch. Yeah, Optimizer thinks they both are identical query and merged it (as it is not aware of snapshot id/reference info).

I confirmed that the query works after changing to SparkTable#equals to compare branch and snapshotId as well as name, to have unique canonical form for each ref.

I think this will miss an edge case. For example, if two time travel queries on same table, one with branch name and one with snapshot, both mapping to the same snapshot id should considered same. But in above case, it will be considered as different.

I think the fix should be to find an effective snapshot id, icebergTable.snapshot(branch) if branch not null or snapshotId if not null and compare effective snapshot id in the equals().

Would you like to contribute the fix?

wooyeong commented 9 months ago

@ajantha-bhat Thank you for your comment. Let me try soon.

github-actions[bot] commented 6 days ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.