apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
402 stars 147 forks source link

Merge into / Upsert #402

Open Fokko opened 7 months ago

Fokko commented 7 months ago

Feature Request / Improvement

Have an API to efficiently perform an upsert

corleyma commented 6 months ago

To work well with some of the larger data usecases where folks are using PySpark today, I think this would need to play well with pyarrow streaming read/write functionality, so that one could do atomic upsert of batches without having to read all the data into memory at once.

I call this out because current write functionality works with pyarrow Tables, which are fully materialized in memory. Working with larger data might include making the pyiceberg write APIs work with Iterator[RecordBatch] and friends (as returned by pyarrow Datasets/Scanner) in addition to pyarrow Tables.

sungwy commented 3 months ago

Hi @corleyma - I opened up this PR to address your comment here by introducing a scan API that will return a RecordBatchReader. It's pending some resolutions with related issues, but it's almost complete. Would appreciate your feedback if you are interested in using this API 🙂

Milias commented 2 months ago

Hello, thanks for all the great work!

Now that version 0.7.0 is released, I was wondering where can I find some documentation covering how to write to a partitioned table.

Thanks a lot!

sungwy commented 2 months ago

Hello, thanks for all the great work!

Now that version 0.7.0 is released, I was wondering where can I find some documentation covering how to write to a partitioned table.

Thanks a lot!

Hi @Milias you can create a table with a partition by following the documentation here on Creating a table.

I realize we could have had an explicit section on creating and writing to a partitioned table under Write to a Table section. Currently, we support partitioned writes for IdentityTransform and TimeTransform (Year, Month, Day, Hour) partitions. Please let us know if that works for you!

Sung

Milias commented 1 month ago

Hey @sungwy, thanks for the quick answer!

I was already making use of writing support and indeed had seen that section of the documentation. Right now I prepared a very quick test of writing to a table partitioned with IdentityTransform. Then, .append and .overwrite work as expected, that is, they either append new data to the appropriate partitions and replace the whole table, respectively.

After this I'm left wondering how can individual partitions be replaced. Maybe this functionality is not yet supported, with writes to partitioned table being only the first step. To give an example of what I mean, taking the table from the example in the documentation:

import pyarrow as pa

df = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029, "index": 1},
        {"city": "San Francisco", "lat": 37.773972, "long": -122.431297, "index": 1},
        {"city": "Drachten", "lat": 53.11254, "long": 6.0989, "index": 2},
        {"city": "Paris", "lat": 48.864716, "long": 2.349014, "index": 2},
    ],
    schema=schema_to_pyarrow(tbl.schema())
)

tbl.overwrite(df)

With table:

schema = Schema(
    NestedField(1, "city", StringType(), required=False),
    NestedField(2, "lat", DoubleType(), required=False),
    NestedField(3, "long", DoubleType(), required=False),
    NestedField(4, "index", IntegerType(), required=True),
)

partition_spec = PartitionSpec(   
    PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="index_partition"),
)

tbl = catalog.create_table("public.cities", schema=schema, partition_spec=partition_spec)

Then, if we add a few more rows:

df2 = pa.Table.from_pylist(
    [
        {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029, "index": 1},
        {"city": "Null Island", "lat": 0.0, "long": 0.0, "index": 3},
    ],
    schema=schema_to_pyarrow(tbl.schema())
)

Then, when doing tbl.overwrite(df2) I would like to have some way of indicating that partition with index = 2 should be left as-is.

It is very possible that I misunderstood the precise scope of write support to partitioned tables, since this issue #402 is still open. But in case that it is already possible to overwrite specific partitions, that's the piece of information I was searching for.

Thanks a lot again :smiley:

sungwy commented 1 month ago

I think you must be referring to dynamic overwrite / replace partition API that detects the partitions of the given input and replaces it. This feature is actually still in progress on this PR: https://github.com/apache/iceberg-python/pull/931

Milias commented 1 month ago

That PR looks exactly like what I am asking for, yes! Thank you very much for pointing it out. I will keep an eye on it.

ev2900 commented 1 month ago

Any update on if there is an API for merge into / upsert?

sungwy commented 1 month ago

Hi @ev2900 - would using the overwrite feature by specifying the boolean expression on which to upsert work for your use case?

https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L479-L483

I realize we don't have an example of invoking an overwrite without the overwrite_filter specified. I'll raise an issue to track adding this explicitly into our API documentation. https://github.com/apache/iceberg-python/issues/1008

ev2900 commented 1 month ago

Let me take a look at this. It would be very helpful if there was an example

sungwy commented 1 month ago

@ev2900 agreed :) I've added that Issue above (#1008) to address that

Minfante377 commented 4 weeks ago

Any updates on this one? I'm good with overwrite + overwrite filters for now but for tables where columns are populated by different sources it would be awesome to have full MERGE INTO support and to be able to select which columns to update

sungwy commented 5 days ago

Hi @Minfante377 sorry for the delayed response, and thank you for the interest!

Unfortunately, this is still an open issue on PyIceberg with no assignee. MERGE INTO with the column matching semantics like:

MERGE INTO table t using (SELECT ...) s ON t.id = s.id

is unfortunately a bit complicated to support efficiently, so I've been trying to make time to look at it in depth. And unfortunately I haven't had the time for this specific issue.

Would you be interested in making a contribution?