Open marvinlanhenke opened 7 months ago
cc @liurenjie1024 @Xuanwo @viirya @sdd @ZENOTME @Fokko
Thanks for spending the time thinking about this and putting your thoughts into words. I need to spend some time re-reading the associated parts of the spec and looking through the Java and possibly python implementations before being able to comment. I should get chance tomorrow.
I'm not sure whether my understanding is correct:
The target of table.append()
is used to insert a batch of data into the table. It's seems like a high level API which will use two lower API:
To separate these two interfaces, I think we don't need to delegate the conversion between RecordBatch
and DataFile
in the transaction.
I'm not sure whether my understanding is correct: The target of
table.append()
is used to insert a batch of data into the table. It's seems like a high level API which will use two lower API:
- writer API for convert RecordBatch to DataFile
- transaction API for commit the DataFile(update the table metadata)
To separate these two interfaces, I think we don't need to delegate the conversion between
RecordBatch
andDataFile
in the transaction.
I think your understanding is correct - and I agree if the writer API already does the conversion from RecordBatch to DataFile, the Transaction shouldn't be concerned with this issue, since it is a higher-level API. However, the Transaction calls the writer that writes the actual DataFile, which seems reasonable.
So the Transaction append
(if I understand the py impl correctly) does all of those things:
@ZENOTME Where would the writer API (which I only know from the design spec in #34) fit best here? Should a Transaction create a new writer everytime a new transaction is created? Or should the Table itself hold a ref to a writer?
calling the writer to write the DataFile create an instance of MergingSnapshotProducer -> responsible for writing the manifest, manifest_list, snapshot_update commit -> update_table() on the Catalog with TableUpdate & TableRequirements
If any error happens during generating metadata relation info like manifest etc., as the writer already wrote DataFiles, should we go to delete the written DataFiles?
I think your understanding is correct - and I agree if the writer API already does the conversion from RecordBatch to DataFile, the Transaction shouldn't be concerned with this issue, since it is a higher-level API. However, the Transaction calls the writer that writes the actual DataFile, which seems reasonable.
I think this is also what the python implementation does. In Transaction.append
, it calls _dataframe_to_data_files
to generate DataFiles based on the pa.Table
.
we create a Transaction that basically does two things: 2.1. It creates a _MergingSnapshotProducer which is (on a high-level) responsible for writing a new ManifestList, creating a new Snapshot (returned as AddSnaphotUpdate)
Yea, specifically, it is a FastAppendFiles
for appending files. Although the manifest commit logic is actually implemented in _MergingSnapshotProducer
.
This should probably accept a RecordBatch as a param, create a new Transaction, and delegates further action to the transaction.
Is there a reason why append wouldn't take a RecordBatchStream
? It would permit us to make appends that are larger than would fit into memory, if the underlying IO method (eg multipart upload) supported it. I for one would find this useful.
If any error happens during generating metadata relation info like manifest etc., as the writer already wrote DataFiles, should we go to delete the written DataFiles?
I think that this becomes the responsibility of the https://iceberg.apache.org/docs/latest/maintenance/#delete-orphan-files maintenance task, rather than the writer. If we decide that the writer could attempt to do this, it should be optional. This would slow down writes in the case where there is a lot of write contention.
@marvinlanhenke Sorry for being late to the party here. Appending a file is rather straightforward, but all the conditions must be met. This is the high-level way of appending a file:
calling the writer to write the DataFile
I think this is also what the python implementation does. In Transaction.append, it calls _dataframe_to_data_files to generate DataFiles based on the pa.Table.
In PyIceberg we have _dataframe_to_data_files
that writes out the Arrow table to one or more Parquet files. Then we collect all the statistics and return a Datafile that can be appended to the table. I hope in the future that we can push this down to iceberg-rust :)
If any error happens during generating metadata relation info like manifest etc., as the writer already wrote DataFiles, should we go to delete the written DataFiles?
Iceberg Java does this best effort. If it fails, it tries to clean it up, but it is always possible that this won't happen (Looking at you OOMs). This is where the maintenance tasks kick in, as @sdd already pointed out.
Talking about prioritization: Things can happen in parallel. For example, something simpler like updating table properties will make sure that the commit path is in place. The Snapshot summary generation can be a PR. The same goes for collecting the column metrics.
I think to implement appending data file, there are two main tasks:
Currently there is no design or plan for 1, and @ZENOTME is working on 2.
I've compiled a doc for discussing roadmaps and features for iceberg-rust, welcome to share you thoughts and feel free to add what's in your mind. cc @viirya @marvinlanhenke
Thanks @liurenjie1024. The roadmaps doc looks good to me. I added a few items under DataFusion integration. Feel free to modify it. Thanks.
...out of curiosity, I took a closer look at the pyiceberg impl and how the
Table.append()
works.Now, I would like to pick your brain, in order to understand and track the next steps we have to take to support
append
as well (since we should be getting close to having write support). The goal here is, to extract and create actionable issues.Here is what I understand from the python impl so far (high-level):
append()
on the Table class with our DataFrame: pa.Table and the snaphot_properties: Dict[str, str]Transaction
that basically does two things: 2.1. It creates a_MergingSnapshotProducer
which is (on a high-level) responsible for writing a new ManifestList, creating a new Snapshot (returned as AddSnaphotUpdate) 2.2 It callsupdate_table
on the respective Catalog which creates a new metadata.json and returns the new metadata as well as the new metadata_locationpyiceberg-link
Here is what I think we need to implement (rough sketch):
fn append(...)
onstruct Table
: This should probably accept a RecordBatch as a param, create a newTransaction
, and delegates further action to the transaction.fn append(...)
onstruct Transaction
: Receives RecordBatch and snapshot_properties. Performs validation checks. Converts the RecordBatch to a collection ofDataFiles
and creates a_MergingSnapshotProducer
with the collection._MergingSnapshotProducer
: :: write manifests (added, deleted, existing) :: get next_sequence_number fromTableMetadata
:: update snapshot summaries :: generate manifest_list_path :: write manifest_list :: create a new Snapshot :: return TableUpdate: AddSnapshotupdate_table
on the concrete Catalog implementationsWhat could be possible Issues here? I think we need to start with the
_MergingSnapshotProducer
(possibly split into mutliple parts) and work our way up the list? Once we have the MergingSnapshotProducer, we can implement the append function on Transaction which basically orchestrates?