prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
16.08k stars 5.39k forks source link

[Iceberg] While time travel Iceberg is not using schema respective to a given Snapshot #23553

Open agrawalreetika opened 2 months ago

agrawalreetika commented 2 months ago

If there is any schema evaluation (Ex: add column, delete column), the schema for snapshots could be different. In the case of Time Travel, we are currently returning the current table schema instead of respective to a given Snapshot.

https://github.com/prestodb/presto/blob/master/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java#L769

https://github.com/prestodb/presto/blob/master/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java#L660

Your Environment

Expected Behavior

We should schema wrt to Snapshot instead of always selecting the current table schema

Current Behavior

Currently returning the current table schema instead of respective to a given Snapshot.

Possible Solution

Make changes to https://github.com/prestodb/presto/blob/master/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java#L660 get schema for a given snapshotId

Steps to Reproduce

presto:default> CREATE TABLE test_table_snapshots (id1 BIGINT, id2 BIGINT);
presto:default> INSERT INTO test_table_snapshots VALUES (0, 00), (1, 10), (2, 20);
presto:default> INSERT INTO test_table_snapshots VALUES (3, 30), (4, 40), (5, 50);

presto:default> ALTER TABLE test_table_snapshots DROP COLUMN id2;
presto:default> ALTER TABLE test_table_snapshots ADD COLUMN id2_new BIGINT;

presto:default> INSERT INTO test_table_snapshots VALUES (6, 60), (7, 70), (8, 80);

presto:default> SELECT * FROM "test_table_snapshots$snapshots";
             committed_at             |     snapshot_id     |      parent_id      | operation |                                                                manifest_list                                   >
--------------------------------------+---------------------+---------------------+-----------+---------------------------------------------------------------------------------------------------------------->
 2024-08-30 00:47:35.624 Asia/Kolkata |  934925653344696772 | NULL                | append    | hdfs://localhost:9000/user/hive/warehouse/test_table_snapshots/metadata/snap-934925653344696772-1-8415374a-adc9>
 2024-08-30 00:49:35.149 Asia/Kolkata | 6824984973739429811 |  934925653344696772 | append    | hdfs://localhost:9000/user/hive/warehouse/test_table_snapshots/metadata/snap-6824984973739429811-1-775221c5-2bf>
 2024-08-30 00:51:15.970 Asia/Kolkata |  399531410398340855 | 6824984973739429811 | append    | hdfs://localhost:9000/user/hive/warehouse/test_table_snapshots/metadata/snap-399531410398340855-1-4ced63bc-6cfb>
(3 rows)

// Access current snapshot
presto:default> SELECT * FROM test_table_snapshots;
 id1 | id2_new 
-----+---------
   3 | NULL    
   0 | NULL    
   4 | NULL    
   5 | NULL    
   1 | NULL    
   2 | NULL    
   6 |      60 
   7 |      70 
   8 |      80 
(9 rows)

// Access previous snapshot, which had 2nd column as "id2" . This should have 2nd column as "id2" not "id2_new"
presto:default> SELECT * FROM test_table_snapshots for SYSTEM_VERSION AS OF 6824984973739429811;
 id1 | id2_new 
-----+---------
   0 | NULL    
   1 | NULL    
   2 | NULL    
   4 | NULL    
   5 | NULL    
   3 | NULL    
(6 rows)

Screenshots (if appropriate)

Context

tdcmeehan commented 2 months ago

Can you gather any information on:

1) What does the Iceberg community think we should do? What does the Spark reference implementation do? What do other interactive engines like Trino do? 2) Time travel is a feature that is called out in the SQL spec. What does the SQL spec say we should do?

agrawalreetika commented 2 months ago

As I checked Snapshot has schemaId associated to it. https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/Snapshot.java#L171

And I checked bit on the Iceberg side, it looks like Spark is taking schema based on the snapshotId https://github.com/apache/iceberg/blob/main/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java#L188

Looks like in Trino is using schema based on snapshot while creating TableHandle - https://github.com/trinodb/trino/blob/master/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java#L454

So I thought In Presto also we can snapshotSchema instead of current Schema always? LMK if we should explore more around this?

tdcmeehan commented 2 months ago

I believe the SQL spec might specify we use the current schema. My understanding is:

1) Iceberg community believes it ought to be the schema of the snapshot 2) The SQL spec defines it to be the latest schema

I will check this later. If my understanding is correct, we would need to get to alignment on which spec to follow.

hantangwangd commented 2 months ago

As I understand, if we consider reading a snapshot as snapshot reading in the scope of transaction isolation, then it should not be affected by the schema changes. We should always read the same data through a specified snapshot id in any situation. In this way, it makes sense to use the schema of the snapshot.

agrawalreetika commented 2 months ago

2 open questions on this for Presto Query behaviour -

  1. When we are doing Time Travel, should we consider schema of the snapshot?

    • As I checked Spark & Trino uses the snapshot's schema
  2. When we are Querying current Schema, do we consider the current schema or schema of the snapshot? -- This is to consider the case when schema evaluation has happened but there are no Data modification, so current table snapshot schema would point to the schema which was before schema evaluation when the snapshot was created.

    • As checked Spark & Trino uses current Schema in this case.

So basically, my understanding is other engines are using Snapshot's schema whenever there is a time travel Query. Otherwise uses the current schema.