apache / iceberg-rust

Apache Iceberg
https://rust.iceberg.apache.org/
Apache License 2.0
674 stars 159 forks source link

Add apply interface in transaction #596

Open ZENOTME opened 2 months ago

ZENOTME commented 2 months ago

This will work for now but might get problematic later on. Just a heads up.

An important concept for Iceberg is to stack snapshots in a single commit. For example, now with append being added in this PR, we can easily add support for truncate. This would be a delete operation where all the data is being dropped, and then just an append.

_Originally posted by @Fokko in https://github.com/apache/iceberg-rust/pull/349#discussion_r1579449633_

Transaction should be able reflect the update in time. According to pyiceberg, we can provide a apply interface to update the table metedata.

ZENOTME commented 2 months ago

Exactly. Internally you want to stack the changes together. For example, within a single transaction, you add a new field and then write the data, then the latest schema should be taken into account. We do this in PyIceberg here: https://github.com/apache/iceberg-python/blob/03a0d65ac05d556d0815e61a016effc2b8993702/pyiceberg/table/__init__.py#L715

We can implement the update_table_metadata based on https://github.com/apache/iceberg-rust/pull/587. cc @liurenjie1024 @Xuanwo @Fokko @c-thiel

liurenjie1024 commented 2 months ago

Hi, @ZENOTME Could you elaborate on this? I'm kind of confusing about the proposal.

ZENOTME commented 2 months ago

Hi, @ZENOTME Could you elaborate on this? I'm kind of confusing about the proposal.

For now, transaction can't reflect the update in time so we can stack them together. e.g.

// table is a v1 table
let tx = Transaction(table);
// This will end up sending two UpgradeFormatVersion into catalog
tx.upgrade_table_version().unwrap().
   .upgrade_table_version().unwrap().commit()

But In pyiceberg, above behaviour will only send one UpgradeFormatVersion and the second one will see that the metadata of table has been updated. The update will be apply into local medata and reflect the change first.

liurenjie1024 commented 2 months ago

We have check to avoid such duplicated case. For metastore tables, it's supposed to apply transaction actions in local, and update metastore pointer. For rest catalog, it should be sent to rest catalog server.

ZENOTME commented 2 months ago

We have check to avoid such duplicated case. For metastore tables, it's supposed to apply transaction actions in local, and update metastore pointer. For rest catalog, it should be sent to rest catalog server.

Why for rest catalog, it should be sent to rest catalog server.🤔 According to API from pyiceberg, it seems possible to create a transaction without auto commit , which means that we also can apply transaction actions in local for rest catalog(do I miss something here)

liurenjie1024 commented 1 month ago

Why for rest catalog, it should be sent to rest catalog server.🤔 According to API from pyiceberg, it seems possible to create a transaction without auto commit , which means that we also can apply transaction actions in local for rest catalog(do I miss something here)

Sorry, I don't quite get the point, if updates are sent to rest catalog server, why we need to update it in local first?

ZENOTME commented 1 month ago

Sorry, I don't quite get the point, if updates are sent to rest catalog server, why we need to update it in local first?

E.g. the user wants to batch multiple updates and commit them once. For the following, after tx.action2().apply(), the update will not be sent to the catalog, but it should apply this action to table metadata locally so that action3 can realize that.

let tx = table.transaction()
tx.action1().apply() // Update table versi
tx.action2().apply() // Append new data file 
tx.action3().apply()
tx.commit()
liurenjie1024 commented 1 month ago

Sorry, I don't quite get the point, if updates are sent to rest catalog server, why we need to update it in local first?

E.g. the user wants to batch multiple updates and commit them once. For the following, after tx.action2().apply(), the update will not be sent to the catalog, but it should apply this action to table metadata locally so that action3 can realize that.

let tx = table.transaction()
tx.action1().apply() // Update table versi
tx.action2().apply() // Append new data file 
tx.action3().apply()
tx.commit()

This still don't answer in what case we need to apply the updates in local before sending to rest catalog server, the rest catalog spec allows for sending multi updates as one transaction: https://github.com/apache/iceberg/blob/6319712b612b724fedbc5bed41942ac3426ffe48/open-api/rest-catalog-open-api.yaml#L705

ZENOTME commented 1 month ago

Sorry, I don't quite get the point, if updates are sent to rest catalog server, why we need to update it in local first?

E.g. the user wants to batch multiple updates and commit them once. For the following, after tx.action2().apply(), the update will not be sent to the catalog, but it should apply this action to table metadata locally so that action3 can realize that.

let tx = table.transaction()
tx.action1().apply() // Update table versi
tx.action2().apply() // Append new data file 
tx.action3().apply()
tx.commit()

This still don't answer in what case we need to apply the updates in local before sending to rest catalog server, the rest catalog spec allows for sending multi updates as one transaction: https://github.com/apache/iceberg/blob/6319712b612b724fedbc5bed41942ac3426ffe48/open-api/rest-catalog-open-api.yaml#L705

Let's use the following pyiceberg example to illustrate that there may be an error in multiple updates if we do not apply the update locally before sending it to rest catalog server.

...
// 1. insert data and delete it in the same transaction, we will send all updates to the rest catalog server until   tx.commit_transaction()
>>> tx = tbl.transaction()
>>> df = pa.Table.from_pylist([{"key":1,"value":1}],schema=schema)
>>> tx.append(df)               // 2. pyiceberg will apply the update locally in https://github.com/apache/iceberg-python/blob/e891bcddb1584c6b7a35b61537ab5802b514ec6d/pyiceberg/table/__init__.py#L276
>>> tx.delete("value==1")  // 3. so that delete can realize that there are some data that need to delete 
>>> tx.commit_transaction()

// 4. Expect result, not data insert 
>>> tbl.scan().to_arrow()
pyarrow.Table
key: int32
value: int32
----
key: []
value: []

If we comment out on the https://github.com/apache/iceberg-python/blob/e891bcddb1584c6b7a35b61537ab5802b514ec6d/pyiceberg/table/__init__.py#L276 to cancel update locally, we can see the unexpected result happened.

>>> tx.append(df)
>>> tx.delete("value=1") // 1. delete can't realized that there are some data that need to delete 
/Users/ze/Project/iceberg-python/pyiceberg/table/__init__.py:622: UserWarning: Delete operation did not match any records
  warnings.warn("Delete operation did not match any records")
>>> tx.commit_transaction()

// 2. We can see that the data will not be delete
>>> table.scan().to_arrow()
/Users/ze/Project/iceberg-python/pyiceberg/avro/decoder.py:185: UserWarning: Falling back to pure Python Avro decoder, missing Cython implementation
  warnings.warn("Falling back to pure Python Avro decoder, missing Cython implementation")
pyarrow.Table
key: int32
value: int32
----
key: [[1]]
value: [[1]]
>>> 

This is my understanding for originally posted by @Fokko in https://github.com/apache/iceberg-rust/pull/349#discussion_r1579449633. Please correct me if there are something wrong here.

liurenjie1024 commented 1 month ago

Thanks @ZENOTME 's explaination. I think I've got your point, we need sth like commit in transaction action so that later transaction action could take into account previously happened changes. The requires to build stage only snapshots.