delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
6.98k stars 1.6k forks source link

[Spark] Optimize batching / incremental progress #3089

Open Kimahriman opened 2 weeks ago

Kimahriman commented 2 weeks ago

Which Delta project/connector is this regarding?

Description

Resolves https://github.com/delta-io/delta/issues/3081

Adds support for splitting an optimize run into batches with a new config spark.databricks.delta.optimize.batchSize. Batches will be created by grouping existing bins into groups until batchSize is reached. The default behavior remains the same, and batching is only enabled if the batchSize is configured.

This will apply to all optimization paths. I don't see any reason it shouldn't apply to to compaction, z-ordering, clustering, auto-compaction, or reorg/DV rewriting if a user configures it.

The way transactions are handled within the optimize executor had to be updated. Instead of creating a transaction upfront, we list all the files in the most recent snapshot, and then create transactions for each batch.

This is very important to add for clustering, as there is no way to manually do a partial set of the table using partition filtering. This could cause a lot of execution time and storage space to be wasted if something fails before optimizing the entire table finishes.

How was this patch tested?

A simple new UT is added. I can add others as well, just looking for some feedback on the approach and suggestions of what other tests to add.

Does this PR introduce any user-facing changes?

Yes, adds new capability to optimization that is disabled by default.