delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.43k stars 1.67k forks source link

[Feature Request] Protocol: clarify immutability guarantees #1975

Open Pluies opened 1 year ago

Pluies commented 1 year ago

Hello there!

Feature request

Which Delta project/connector is this regarding?

Overview

Delta's PROTOCOL.md file does not mention file immutability guarantees, which are very important for caching. It would be great to spell these out as part of the protocol.

Motivation

The Lakehouse white paper and the Delta Lake white paper both emphasises Parquet files immutability as an important feature to allow caching of Delta Lake data files on clients, reducing bandwith and network i/o.

Some Delta files are immutable, like Parquet data files, but some aren't, like the _delta_log/_last_checkpoint file that gets rewritten to point to the most recent checkpoint.

However, what about other file types, such as Deletion Vectors? Or Delta Log entries?

It would be great to explicitly spell out in PROTOCOL.md which files are immutable, and can be safely cached forever by clients, and which ones are not immutable, and should be checked for new modifications.

Further details

n/a

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

tdas commented 1 year ago

This is a great point. Do you want to take a crack at it? Also cc @ryan-johnson-databricks

tdas commented 1 year ago

Just so that we are on the same page, all files other than the _last_checkpoint file are immutable. So its valid to cache all the other files once those files have been completely written out by the process. However, it can be sometimes hard to determine when a file has been complete written out as different files systems / object stores provide different visibility guarantees. For example, S3 make an object visible only when it has been completely written out. But ADLS makes the object visible as soon as it created, thus it will look like the object is mutating until the application-layer output stream has been closed. So there are few rules of thumb you can follow when you are trying to determine when a file is finalized and ready to be cached:

Any more thoughts @ryan-johnson-databricks @prakharjain09

ryan-johnson-databricks commented 1 year ago

Great questions! Unfortunately no great answers IMO because cloud provider docs are not clear.

As far as caching is concerned, the safest approach is to associate the object's eTag (AWS, ADLS) or generation number (GCS) with the cache entry. The commit .json and checkpoint .parquet files are always found through listing, and so it should be straightforward to validate the corresponding cache entries before trusting them.

As for the not-great answers:

it can be sometimes hard to determine when a file has been complete written out as different files systems / object stores provide different visibility guarantees. For example, S3 make an object visible only when it has been completely written out. But ADLS makes the object visible as soon as it created, thus it will look like the object is mutating until the application-layer output stream has been closed.

I had a vague idea that in-progress ADLS writes show up as an empty (0-byte) file until the write completes, at which point the content of the file becomes visible all at once. Almost as if file creation is a separate operation from writing that file. And overwrites truncate the file back to 0 bytes, with the new data becoming visible atomically once the write completes. I've never heard of a partial or torn read before, that returns only some of the new bytes, or that returns a mix of old+new bytes.

If that assumption holds, the system can simply not cache 0-byte files (which are anyway uninteresting).

Unfortunately, I learned all this from various conversations and cryptic code comments I can't find now, so it's incorrect until proven otherwise. Especially because I can't find any official Azure docs that even talk about this issue.

For .json log files: The only hard guarantee is when the next <version+1>.json is available.

These are written by put-if-absent, and I had always naively assumed that any correct put-if-absent operation must not allow partial reads. Otherwise you'd get really weird "I won the race! But hold on while I finish" type situations. Seems messy, but again no docs to forbid it.

The hard guarantee for a checkpoint parquet files to have been completed is when _last_checkpoint has been updated to point to it.

... but there's no guarantee the client managed to update _last_checkpoint file at all. Even if the write did succeed, timing races could mean the file immediately gets overwritten by a newer (or even an older) checkpoint writer.

tdas commented 1 year ago

Agreed to all of the above points. Thanks for adding more color to the conversation. With these complexities, @ryan-johnson-databricks and @Pluies, do you think anything concrete can be added / going to be useful to add to the protocol?

Pluies commented 11 months ago

Well, that opened a can of worms 😅

Thank you for all the details above, this is super useful.

Based on this, I'm not sure anything needs to be added to the protocol itself. This very issue is now in the first page of google results for "delta immutability", so people searching for answers should find them if needed 👍

If anything, maybe a section like:

All Delta files besides _last_checkpoint are guaranteed to be immutable once they've been fully written. Different filesystems and object stores have varying visibility guarantees for files, so refer to their documentation for details.

Which would lay out Delta's point of view while passing the hot potato of "implementation details" down to the filesystem or object store.

findepi commented 11 months ago

The commit .json and checkpoint .parquet files are always found through listing,

Is it written in Delta specification? For example, Trino Delta connector doesn't use listings at all.

I had a vague idea that in-progress ADLS writes show up as an empty (0-byte) file until the write completes, at which point the content of the file becomes visible all at once.

That would violate the Delta specification. Delta table format requires transaction file to either not exist, or be readable/usable & complete. From Spark implementation perspective, doesn't ADLS LogStore use renames? Renames is at least what Trino Delta's AzureTransactionLogSynchronizer uses to "appear" files atomically.

For .json log files: The only hard guarantee is when the next <version+1>.json is available. But that can be arbitrary time period later, so the waiting a few minutes (say, 10 minutes) after a file becomes visible should be good enough.

I did not understand this part. What do this 10 minutes serve?

Which would lay out Delta's point of view while passing the hot potato of "implementation details" down to the filesystem or object store.

I don't think such loose specification is needed. For example Iceberg spec says "file are immutable" and this seems to work for them.

cc @ebyhr @findinpath

ryan-johnson-databricks commented 11 months ago

I had a vague idea that in-progress ADLS writes show up as an empty (0-byte) file until the write completes, at which point the content of the file becomes visible all at once.

That would violate the Delta specification.

For data and commit files, you are correct.

The visible-but-empty issue applies mainly to the _last_checkpoint file, which will be repeatedly overwritten (so can't do rename-if-absent). I don't think Azure supports an atomic rename that replaces an existing file? Meanwhile, the last checkpoint format includes a checksum that can at least be used to detect an incomplete write.

For .json log files: The only hard guarantee is when the next <version+1>.json is available. But that can be arbitrary time period later, so the waiting a few minutes (say, 10 minutes) after a file becomes visible should be good enough.

I did not understand this part. What do this 10 minutes serve?

This was my error, overthinking the unrealistically general case. The ten minutes would be a hack to decide whether the file size is done changing --- if somebody actually wrote in a way that allows the file size to change. As you pointed out tho, this is not an issue for commit and data files. Google and AWS don't make an upload visible before it completes, and Azure allows atomic rename. Even if somebody wrote directly with Azure, I've never heard of a partial write becoming visible. The file always appears to be either complete, or empty.

Trino Delta connector doesn't use listings at all.

@findepi Could you elaborate? I don't have an immediate intuition of how one could (efficiently) discover commit versions and checkpoints without listing? Even when _last_checkpoint file is accurate (it can be stale in practice), and even assuming blind reads (don't need file size, etag, etc first), it would still take a multiple get-status calls to binary search for the latest commits after that checkpoint?

ryan-johnson-databricks commented 11 months ago

@Pluies coming back to the original request:

Delta's PROTOCOL.md file does not mention file immutability guarantees, which are very important for caching. It would be great to spell these out as part of the protocol.

The protocol doesn't call it out on in the intro matter, but it does require immutable data files and immutable commit files. Specifically:

Data files MUST be uniquely named and MUST NOT be overwritten.

and

Writers MUST never overwrite an existing log entry. When ever possible they should use atomic primitives of the underlying filesystem to ensure concurrent writers do not overwrite each others entries.

The original checkpoint file spec was not very clear, but v2 checkpoints should address the issue in practice, because both the v2 checkpoint itself, and the sidecar files it references, are uniquely named. Specifically:

UUID-named Checkpoints: These follow V2 spec which uses the following file name: n.checkpoint.u.{json/parquet}, where u is a UUID and n is the snapshot version that this checkpoint represents.

and

A sidecar file contains file actions. These files are in parquet format and they must have unique names.

If all goes well, v2 checkpoints should be available with the next release of Delta, e.g. https://github.com/delta-io/delta/pull/1983, https://github.com/delta-io/delta/pull/2031, https://github.com/delta-io/delta/pull/2056, and https://github.com/delta-io/delta/pull/2102.