apache / iceberg-rust

Apache Iceberg
https://rust.iceberg.apache.org/
Apache License 2.0
634 stars 149 forks source link

EPIC: Rust Based Compaction #624

Open Xuanwo opened 1 month ago

Xuanwo commented 1 month ago

This is an EPIC issue that serves as a direction worth our community's attention. We can use this issue to track the features we want to offer and how close we are to achieving them.


The issue concerns compaction, specifically native compaction, to be precise, Rust-based compaction.

We all know that compaction is a resource-intensive task that involves heavy calculations, significant I/O, substantial memory consumption, and large-scale resources. I beleive compaction is the killer feature that iceberg-rust can provide for the whole communnity. I expect iceberg-rust can implement compaction more efficiently in terms of both performance and cost.

In this EPIC, I want iceberg-rust to deliver:

Compaction API for a table.

Bindings for Python and Java.

Tests (E2E tests, behavior tests, fuzz tests, ...)

Compaction is more complex than just reading. Mistakes we make could break the user's entire table.

We will need various tests, including end-to-end tests, behavior tests, and fuzz tests, to ensure we have done it correctly.

flaneur2020 commented 1 month ago

it makes a lot of sense to use rust on these computation heavy workloads.

i'm looking forward to save up to 90% memory usages after switching to the rust implementation.

keep a close watch on this issue!

kevinjqliu commented 1 month ago

Thanks for starting this! Linking the relevant issue from the pyiceberg side, iceberg-python/#1092 Would be great to collaborate on this!

kevinjqliu commented 1 month ago

Data compaction is something the BDT at Amazon is heavily invested in and blogged about in https://aws.amazon.com/blogs/opensource/amazons-exabyte-scale-migration-from-apache-spark-to-ray-on-amazon-ec2/.

Related to the ray-project/deltacat project.

manuzhang commented 1 month ago

Does this depend on a distributed compute engine?

Xuanwo commented 1 month ago

Does this depend on a distributed compute engine?

I think this can work on a distributed query engine but not required.

hzxa21 commented 1 month ago

I am super excited that the community is considering offering compaction capability in iceberg-rust. Thanks for bring this up!

There are several things popping out of my mind that we need to consider in terms of compaction:

  1. Compaction API i. High-level API: trigger compaction for a table with some configuration similar to the spark rewrite_data_files. This is essentially table.compact() mentioned in the issue description and is useful for small tables. ii. Low-level API: given a collection of files (including both data. eq-delete and pos-delete files) of a table, output a collection of compacted files (including both data and eq-delete files). This is useful as a building block of more sophistic compaction stretegy.
  2. Compaction strategy: i. When to trigger a compaction ii. Which files to pick to minimize read/write amplification, space amplification, improve sorting, etc...
  3. Compaction runtime: i. Where to schedule a compaction task ii. How to determine the parallelism of a compaction

I think we need to be clear about what should be done in iceberg-rust and what are the no-goals. IMO, given that iceberg is a spec rather than a service, I think Compaction API (1), especially the low-level API, is more essential than others (2 & 3). With that, users of iceberg-rust have the flexibility to further implement 2 and 3 based on their workload, use cases and environments.

sdd commented 1 month ago

I will be working on adding support for positional and identity deletes to the table scans next. Hopefully this will help towards this goal.

liurenjie1024 commented 1 month ago

Thanks @Xuanwo for raisings this. I think we have a long way to go, for example, support for deletion files, transaction api. Also, I'm not sure if we actually don't need a distributed compute engine for this, since typically iceberg table is quite huge and single machine will not be able to serve the workload.

vakarisbk commented 3 weeks ago

Thanks @Xuanwo for raisings this. I think we have a long way to go, for example, support for deletion files, transaction api. Also, I'm not sure if we actually don't need a distributed compute engine for this, since typically iceberg table is quite huge and single machine will not be able to serve the workload.

DataFusion now has a distributed mode project, DataFusion Ray. By the time we finish preparations for compaction, DataFusion Ray may become sufficient enough at least for the compaction use case.

camuel commented 2 weeks ago

Does anyone has any insights on how computation heavy is the compaction workload really? Like on a beefy machine what compaction rate will be possible? Like 1GB/sec? 10GB/sec? A ball part figure? Separately what is possible compaction rate from first principles and what is typical compaction rate per node with today Iceberg impl.?

I think in the overwhelming majority of usecases, data is primarily partitioned by time, and most data of a huge table is old data, of low value and absolutely not to be routinely touched on every compaction job! Am I right? Or my experience is not representative? If so, it is not the the size of the table which matters, but the rate of new data arrival between compaction jobs, as mostly those new data is be compacted and rewritten and only once in its lifetime (unless we have DML touching old data which should be extremely rare compared to routine compaction).

Separately, what is exactly compaction is a bit confusing to me, should data be sorted across resulting (after compaction) parquet files in a partition, or it is enough that each resulting parquet file is only internally sorted and it is permissible for all parquet files in a partition to be all over the partition range? As per specification, it is enough to sort internally each resulting parquet file and not only that, the sort order can be different for each file that hints that there is no global sort in a partition. I looked into java compaction code and it indeed sorts it across files but not across all files in a partition but across some files in a partition which they call "a group" and "group" is not a concept that exists in the specification which puzzles me how Rust implementation should approach it. It is quite a biggie here.

Assuming only a local sort within a resulting parquet file (as per current spec) and assuming the file size around 100MB (which is currently the default) and assuming partition by time and that data comes more or less chronologically so compaction is needed only for data arrived between the compaction jobs (like always in my experience) the single beefy node running compaction code implemented in rust must not find it infeasible to serve quite a huge table really.

Of course there are corner cases, like partition scheme is changed and it is being asked to re-partition the entire table on the next compaction job but I don't think this will ever happen in practice. If the entire table is to be repartitioned, this won't be just scheduled for the next compaction, perhaps it will be a custom job on a separate cluster and won't be even titled "compaction".

Will appreciate any feedback. Thanks!

liurenjie1024 commented 2 weeks ago

Hi, @camuel

Does anyone has any insights on how computation heavy is the compaction workload really? Like on a beefy machine what compaction rate will be possible? Like 1GB/sec? 10GB/sec? A ball part figure? Separately what is possible compaction rate from first principles and what is typical compaction rate per node with today Iceberg impl.?

I think it's hard to say the compaction rate. Compaction is a mixed workload, it involves io(read/write files), computation(reclustering, binpacking), so it's difficult to estimate the compaction rate. Things gets more complicated when we need to handle deletion files.

As with partition spec change, it's typically lazy in iceberg, e.g. partition spec chanage is a metadata only operation in iceberg, and it's not part of compaction typically.