delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.97k stars 365 forks source link

Don't commit if commit has transaction identifier that has been committed already #2580

Open vegarsti opened 3 weeks ago

vegarsti commented 3 weeks ago

I'm not sure if this is a bug or a feature request. It might be that I'm using the library incorrectly or misunderstanding how this should work!

I want to do idempotent commits, using transaction identifiers. I have seen that there has been added some support for this recently, e.g. 88ea11027eb841628b8c94225181407cb00bdd25, 54cfa0638e6861d56efe3aa57680a961ef571381.

What I'm seeing is that if I make multiple commits with the same app_transaction (same ID and version), they will all be written. And same with any other combination. Is that the correct behavior? This commit seems to me to indicate that delta-rs should be able to only keep the latest version per txn app ID, so maybe I'm doing something wrong? 54cfa0638e6861d56efe3aa57680a961ef571381

For comparison, Databricks SQL will only commit the above scenario once.

In case I am onto something, here is a branch I've pushed with a test that fails on main, but which passes with my modification: It writes two commits where both have the same app ID + version. We verify that the second write did not cause a new active file. https://github.com/vegarsti/delta-rs/commit/4e52f4f8f52a1e336a4697b83e0606e3bd7d34aa

There's some nuance here, though: it's allowed for a commit to have multiple app transactions. I have no idea how to handle that, please advise 😅

Also, if the commit fails because of a concurrent update, I would also need to do the same check after the snapshot update.

Blajda commented 3 weeks ago

If the commits are performed in series then the behavior you are experiencing is expected. See the below quote from the Delta protocol.

The semantics of the application-specific version are left up to the external system. Delta only ensures that the latest version for a given appId is available in the table snapshot. The Delta transaction protocol does not, for example, assume monotonicity of the version and it would be valid for the version to decrease, possibly representing a "rollback" of an earlier transaction.

I don't think the current changes in your branch should be merged into delta-rs since it's application specific. The enforcement of monotonicity should be either performed in application code. E.G in this case you write a wrapper around operations for your application needs.

An alternative is to generalize your changes in transaction/mod.rs to instead use some hook interface and then have a specific monotonicity hook implementation that others can use. If the hook approach is decided then maybe a new custom delta-rs config can be added e.g delta-rs.transaction-id that would then be used. Just need to be careful here since there might be a few foot traps. (e.g rollbacks)

vegarsti commented 3 weeks ago

Thanks for your answer @Blajda!

I don't think the current changes in your branch should be merged into delta-rs since it's application specific. The enforcement of monotonicity should be either performed in application code. E.G in this case you write a wrapper around operations for your application needs.

I see! Given what the protocol document says, I understand that we can't discard versions with lower versions.

What I'm worried about with a wrapper is that it's likely a race condition, since it's outside the write and thus outside the commit retry logic. But I guess that's hard to avoid!

An alternative is to generalize your changes in transaction/mod.rs to instead use some hook interface and then have a specific monotonicity hook implementation that others can use. If the hook approach is decided then maybe a new custom delta-rs config can be added e.g delta-rs.transaction-id that would then be used. Just need to be careful here since there might be a few foot traps. (e.g rollbacks)

That sounds like a good idea! I see you mention it here as well #2130. Is there any scaffolding for something like that already? Would it be similar to the post commit hooks, but as a pre commit hook?

To clarify: PR #2539 which was merged, what that does is all the work of inserting and keeping track of the identifiers, but there is no reconciliation yet, correct? I was a bit confused by "fix: only keep latest txn per app"

vegarsti commented 2 weeks ago

I found the check_for_updated_application_transaction_ids_that_current_txn_depends_on function, which does what I expect! https://github.com/delta-io/delta-rs/blob/0a44a0def977392aa25c9384c954d53f81eab6ed/crates/core/src/operations/transaction/conflict_checker.rs#L575-L594

But looks like it would only kick in if there's a commit conflict, i.e. not if they are subsequent, and commit 2 only already has read commit 1.

https://github.com/delta-io/delta-rs/blob/0a44a0def977392aa25c9384c954d53f81eab6ed/crates/core/src/operations/transaction/mod.rs#L570-L580

test_app_txn_conflict shows that it fails in that case: https://github.com/delta-io/delta-rs/blob/0a44a0def977392aa25c9384c954d53f81eab6ed/crates/core/src/operations/transaction/application.rs#L85-L86

In summary, my understanding of how this behaves:

Those two scenarios should cover it, though!