apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
462 stars 167 forks source link

Support setting a snapshot property in same commit as spark.sql #368

Closed brianfromoregon closed 1 month ago

brianfromoregon commented 9 months ago

Feature Request / Improvement

This cookbook has a java snippet to update a snapshot property atomically with a sql MERGE INTO.

// Update the target table and set the watermark in the same commit
CommitMetadata.withCommitProperties(
    Map.of("watermark:raw.log_source", endSnapshotId),
    () -> {
        spark.sql("MERGE INTO ...");
        return 0;
     }, RuntimeException.class);

It would be great if pyiceberg allowed me to similarly wrap my spark.sql call and add snapshot properties.

Fokko commented 9 months ago

Thanks for raising this @brianfromoregon!

I think it would be a great addition. We need to extend the .append and .overwrite API and allow passing in a map. And then it needs to be passed in when constructing the Summary:

https://github.com/apache/iceberg-python/blob/622adb799e1c5acb48be95ba14a670bafef54a61/pyiceberg/table/__init__.py#L2427

Are you interested in contributing this? :)

brianfromoregon commented 9 months ago

Beyond writing snapshot summary fields, this issue is also requesting ability to write those fields in same snapshot as one created by spark.sql. That would take changes beyond what you describe right @Fokko . Ideally id have a single transaction to (1) read a summary field (2) run spark.sql (3) write a summary field

Fokko commented 9 months ago

@brianfromoregon That's not possible. Spark will create the snapshot, and those are immutable. So you cannot update those afterward in PyIceberg.

brianfromoregon commented 9 months ago

@Fokko Interesting, that makes sense, so what does the linked cookbook code mean when it says "in the same commit"?

Fokko commented 9 months ago

@brianfromoregon In the cookbook example it will be in the same commit, which will result into a single snapshot. I was under the impression that you also want to replicate this on the Python side :)

brianfromoregon commented 9 months ago

@Fokko Yes I am using python. So this is possible from java but impossible from python, interesting I wonder why.

Fokko commented 9 months ago

Because it is not part of the API, so we need to extend it :) In Python, you would append an Arrow table to the Iceberg table and set the properties in the same commit (snapshot).

brianfromoregon commented 9 months ago

Ok agreed. So my intention was to have this issue represent extending the API to allow same commit semantics like the java cookbook, and then issue #367 represent the (simpler) change to allow setting snapshot properties in general.

ajosephides commented 9 months ago

As @brianfromoregon has mentioned I also understood the issue raised to " represent extending the API to allow same commit semantics like the java"

Fokko commented 9 months ago

I would love that, and this is what I suggested in https://github.com/apache/iceberg-python/issues/368#issuecomment-1928020308

Gowthami03B commented 9 months ago

@brianfromoregon @Fokko can I take a stab at this?

Gowthami03B commented 8 months ago

https://github.com/apache/iceberg-python/pull/419

corleyma commented 8 months ago

I think there's still some confusion here, since there are two possible interpretations of "represent extending the API to allow same commit semantics like the java":

I think interpretation 2 is what @brianfromoregon was getting at, and I don't know how feasible it is... but I think both capabilities are nice so it's great to have #419, and if interpretation 2 is possible, that would also be really useful. Alternative to interpretation 2 would be some other way to set snapshot properties in PySpark without using pyiceberg, and I don't think that exists either.

brianfromoregon commented 8 months ago

Hi @corleyma, my thinking was that Issue 367 is meant to represent "Interpretation 1" and this issue 368 is meant to represent "Interpretation 2". Fully agreed that both features are useful!

sungwy commented 7 months ago

Hi @brianfromoregon and @corleyma , from my understanding of PyIceberg and PySpark Iceberg, I'm not sure if allowing the two separate clients to participate in the same transaction will be possible any time soon. Currently, Transactions are designed as classes, and they are available only to the specific client that's building it.

This feature request implies that the transaction should be shared between the two separate clients which would need either:

  1. the Transaction class to be exchanged in a way that can be understood by both Spark and Python within the same machine (presumably the Spark driver)
  2. or have Transaction that is sent to an intelligent Catalog backend, that doesn't commit it immediately, but stages the transaction - so that the transaction can be looked up with a unique identifier and built upon by separate clients, until it is committed.

Is there a specific use case you are thinking of that requires both PySpark-Iceberg and PyIceberg? We know PyIceberg is still evolving, but it is growing fast and we will reach somewhat feature parity in the near future. After that, the choice of the client we use would really depend on the use case - would it require the built in distributed capabilities of spark? or do we want to perform simpler transactions through PyIceberg?

@Fokko - do you have any thoughts on this topic?

brianfromoregon commented 7 months ago

Hi @syun64, thanks for chiming in!

My batch app store historical data, there is always a date column. It runs for each date and will insert data for that date. Sometimes there is legitimately no data available for a particular date, no matter how many times it runs there will never be data. Other times the app has an error or fails to run and needs to be re-run for a date. I'm trying to allow my app to differentiate between missing dates and present-but-empty dates so it does not constantly try re-running for dates that will never produce data. When I was using raw parquet files I would simply write an empty file for a date to represent present-but-empty. Asking in Slack I learned that Iceberg does not support this concept (for example no empty partitions allowed) so instead I am aiming to use metadata (snapshot properties) to store the date ranges that are reflected in the stored data.

In order to implement this with snapshot properties I want my writer to do the following transactionally:

  1. Fetch the current snapshot's dateranges property.
  2. Modify that dateranges value to include the dates which are about to be written.
  3. Merge the new data and update the dateranges snapshot property, in the same new snapshot.

If another concurrent writer were to write its own new snapshot between step 1 and 3, I would want my writer to throw an exception and then I'll try again at step 1 starting from the latest snapshot.

Today I use PySpark Iceberg for writing because PyIceberg does not yet support partitioned writes. PyIceberg is getting partitioned writes soon, I am excited to try it! But until then I'm using PySpark for writing and want some way to accomplish steps 1-3 from a python client. I hope this explains my goal and motivation.

Another approach I had in mind was to be able to read and write snapshot properties from PySpark SQL query. That is appealing because it would be a single-client solution which would also allow my non-python clients to perform writes that honor this dateranges property.

sungwy commented 7 months ago

In order to implement this with snapshot properties I want my writer to do the following transactionally:

Fetch the current snapshot's dateranges property. Modify that dateranges value to include the dates which are about to be written. Merge the new data and update the dateranges snapshot property, in the same new snapshot. If another concurrent writer were to write its own new snapshot between step 1 and 3, I would want my writer to throw an exception and then I'll try again at step 1 starting from the latest snapshot.

Another approach I had in mind was to be able to read and write snapshot properties from PySpark SQL query. That is appealing because it would be a single-client solution which would also allow my non-python clients to perform writes that honor this dateranges property.

I think you should be able to do this today by keeping track of the Iceberg table snapshot you are looking at to do task (1), and then writing with snapshot property and then using an isolation property based on the snapshot commit you've started your sequence of operations from, so that your commit fails if there has been a concurrent commit that was made since then.

https://iceberg.apache.org/docs/1.5.0/spark-configuration/#write-options

"isolation-level", "validate-from-snapshot-id" and "snapshot-property" are probably the write options you want to use to achieve your goal in PySpark. Let me know if that works for you!

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

sungwy commented 1 month ago

Snapshot property can now be specified in PyIceberg Table APIs

https://github.com/apache/iceberg-python/pull/419