apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

Support Conditional Transaction Commits #6514

Open fqaiser94 opened 1 year ago

fqaiser94 commented 1 year ago

Feature Request / Improvement

TLDR: I'd like to propose adding a new void commitTransaction(CommitCondition commitCondition) method to the Transaction interface so users can specify the conditions under which a transaction can be safely committed or not, given that other transactions may have changed the table concurrently. This will enable use-cases such as monotonically increasing watermarks in table properties.

General Problem

I want to start by describing the challenge I currently face using the existing Transaction.commitTransaction() API.

Consider the following example situation:

This scenario can demonstrated in code like so:

@Test  
public void watermarkIsNotIncrementedCorrectlyWithConcurrentTransactionCommits() throws Exception {  
  String customWatermarkKey = "custom_watermark";  

  table.updateProperties().set(customWatermarkKey, "0").commit();  
  Integer initialExpectedVersion = 1;  
  Assert.assertEquals("Table should be on version 1", initialExpectedVersion, version());  
  Assert.assertEquals(  
    "Initial custom watermark value is",  
    "0",  
    table.properties().get(customWatermarkKey));  

  Supplier<String> nextWatermarkValue = () ->  
    Optional.ofNullable(table.properties().get(customWatermarkKey))  
    .map(Integer::parseInt)  
    .map(x -> x + 1)  
    .map(String::valueOf)  
    .orElse("0");  

  Function<DataFile, Thread> makeThread = (dataFile) -> {  
    Transaction txn = table.newTransaction();  
    txn.newAppend().appendFile(dataFile).commit();  
    txn.updateProperties().set(customWatermarkKey, nextWatermarkValue.get()).commit();  
    return new Thread(txn::commitTransaction);  
  };  

  Thread thread1 = makeThread.apply(FILE_A);  
  Thread thread2 = makeThread.apply(FILE_B);  

  thread1.start();  
  thread2.start();  

  thread1.join();  
  thread2.join();  

  Assert.assertEquals(  
    "Table should be on two versions ahead as two transactions have been committed successfully",  
    initialExpectedVersion + 2,  
    (int) version());  
  Assert.assertEquals(  
    "We want custom_watermark to also be incremented twice but in fact it appears to have been incremented only once",  
    "1",  
    table.properties().get(customWatermarkKey));  
}

You might think at this point a simple solution to this problem is to simply set TableProperties.COMMIT_NUM_RETRIES = 0. Setting this property ensures iceberg will just throw a CommitFailedException, instead pf retrying "losing" transactions and putting us in a bad state. In that sense, this is an improvement. Unfortunately, this only works for the specific sequence of events described above, and is not a general solution. This is because before Iceberg even attempts to perform the atomic commit operation the first time (i.e. not on a retry attempt), it will first check whether the TableMetadata is up-to-date and if not it will refresh the TableMetadata before applying the updates and attempting the commit. Put another way, the automatically-refresh-TableMetadata behaviour is not a part of the retry mechanism and so you can still hit this problem even without any retries. This situation can also be reproduced in code, as follows:

@Test  
public void removingRetriesIsNotAGeneralSolution() throws Exception {  
  String customWatermarkKey = "custom_watermark";  

  table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "0").set(customWatermarkKey, "0").commit();  
  Integer initialExpectedVersion = 1;  
  Assert.assertEquals("Table should be on version 1", initialExpectedVersion, version());  
  Assert.assertEquals("Initial custom watermark value is", "0", table.properties().get(customWatermarkKey));  

  Supplier<String> nextWatermarkValue = () ->  
    Optional.ofNullable(table.properties().get(customWatermarkKey))  
    .map(Integer::parseInt)  
    .map(x -> x + 1)  
    .map(String::valueOf)  
    .orElse("0");  

  Transaction txn1 = table.newTransaction();  
  txn1.newAppend().appendFile(FILE_A).commit();  
  txn1.updateProperties().set(customWatermarkKey, nextWatermarkValue.get()).commit();  

  // concurrent transaction which is committed before the first transaction ever calls .commit  
  Transaction txn2 = table.newTransaction();  
  txn2.newAppend().appendFile(FILE_B).commit();  
  txn2.updateProperties().set(customWatermarkKey, nextWatermarkValue.get()).commit();  
  txn2.commitTransaction();  
  Assert.assertEquals("Table should be on next version", initialExpectedVersion + 1, (int) version());  
  Assert.assertEquals("Table watermark is incremented to 1", "1", table.properties().get(customWatermarkKey));  

  txn1.commitTransaction();  
  Assert.assertEquals("Table should be on next version", initialExpectedVersion + 2, (int) version());  
  Assert.assertEquals("Table watermark has seemingly not been incremented", "1", table.properties().get(customWatermarkKey));  
}

If anyone has easy ideas for solving this issue, I would love to hear it. Otherwise, please read on for the solution I'm proposing.

Proposed Solution

One way to view this problem is as a case of missing information. While Iceberg does perform some validation checks internally to ensure updates don't conflict (a ValidationException is thrown in these cases), these obviously can't cover use-case specific conditions such as custom watermarks. The only way iceberg could know about these is if iceberg is told. Hence I'm proposing we expose an API that allows users to give iceberg this information.

To me, it made the most sense to add this as an overloaded commitTransaction method to the existing Transaction interface.

interface CommitCondition {  
    boolean check(Table baseReadOnlyTable, Table newReadOnlyTable);  
}

interface Transaction {
    ... existing methods ...
    void commitTransaction(CommitCondition commitCondition);
}

I think I have a proof-of-concept of this idea as a pull request here: https://github.com/apache/iceberg/pull/6513 There's plenty of design decisions remaining to be discussed here:

Some of these are implementation details but I just want to make clear that I haven't figured all of this out yet. If I can get buy-in that this is an actual problem worth solving and if the general approach in the PR makes sense, I would be happy to figure out the remaining details to take this draft pull request to the finish line.

Specific Usecase: Committer Fencing to enable Exactly-Once Commits

I've tried to describe the problem and solution above in as general a fashion as possible because I think this API could be used to enable many and varied use-cases beyond just custom watermarks. It might be helpful to understand the specific use-case I'm trying to solve for. I have a "datafile-committer" application which does the following:

The challenge for us is that we would like to execute this logic in an exactly-once fashion. Unfortunately, "datafile-committer" is a distributed kafka consumer application, and as a result it's possible for multiple instances of the application to handle the same message/Datafile occassionally in exceptional scenarios such as:

Currently, in these exceptional scenarios, we can end up append-committing the same Datafile to an iceberg table multiple times (once for each instance that is handling the same message/Datafile). Since each Datafile can contain hundreds of thousands of records, the resulting iceberg tables can have a very high number of duplicated records. This is obviously bad.

While it is possible to minimize how often these events happen, it is nearly impossible to guarantee that they will never happen. However, since Kafka messages are associated with monotonically increasing offsets, it's possible to include these as a sort-of custom_watermark in the iceberg table properties that can be referenced at commit time to ensure that we always commit a Datafile that has an offset greater than the last committed offset to the iceberg table via a CommitCondition. In this way, we could achieve effectively once guarantees (actually, there would be a little more logic needed to fence out zombie committers and get the desired exactly once guarantees but this is just more logic in the CommitCondition).

Hopefully that helps explain where I'm coming from.

Query engine

None

rdblue commented 1 year ago

@fqaiser94, in general I think this is a good idea, but I'm not sure there are very many use cases for it besides deduplicating high-level operations. I also think that using this as a way to make table properties transactional is probably a bad idea, but it's been requested in the past so we should probably have an approved way to accomplish it.

Table properties purposely don't have transactional guarantees, to avoid using them to coordinate state. Table properties are supposed to be used to configure the table, not to hold important state. What I recommend to accomplish the use case you're talking about is putting the watermark in snapshot properties instead of table properties. That's what we do for Flink commits and we get exactly-once behavior, although the check for the watermark is done outside of the commit path. Concurrent Flink writes would use different watermark properties because they use watermarks that are job-specific.

It's a good idea to provide a custom validation that can do any check you want. For example, your Kafka example could create watermarks based on some chunk of time that is being processed and the custom validation could check the last few snapshots to see whether another process has already committed. That's a good use case.

To do this, I'd probably take a slightly different approach than the one you've implemented. I'd add a validate(Predicate<TableMetadata> current) to either SnapshotUpdate, or the more general PendingUpdate. That way each table operation can have its own custom validation against the current table state. Using a transaction would automatically check all of the custom validations for each operation, so there would be no need to alter Transaction. And there's no need for a custom class. We typically use Java function interfaces where possible.

fqaiser94 commented 1 year ago

Thanks for your response @rdblue!

What I recommend to accomplish the use case you're talking about is putting the watermark in snapshot properties instead of table properties.

Thanks for your advice. It might be an option for us but I'll have to think some more about it. I had some concerns about losing those watermarks due to snapshot expiration. Was also mildly concerned about how expensive it might be to have to search previous snapshots on every commit (assuming that involves multiple network calls). Regardless, have to get this feature in first though!

I'd add a validate(Predicate<TableMetadata> current) to either SnapshotUpdate, or the more general PendingUpdate.

Yea, I was considering this too.

Sounds like we are both more or less in alignment so I'll try reworking my PR to implement it in the way you suggested, might take me a few weeks so please bear with me.

fqaiser94 commented 1 year ago

Created a PR for this here: https://github.com/apache/iceberg/pull/6513

stevenzwu commented 1 year ago

Just to add to @rdblue 's good points above.

Regarding the watermark use cases, we can prefix the watermark (in snapshot metadata) from each writer job to avoid the conflict. The downside is that consumer of the watermark info needs to aggregate and take the min of latest value from all writers. if we go with the conditional commit approach and fail the second commit with lower watermark, how would the second application handle the failure? we can also get into the situation where the second application may never able to commit. if it's watermark is forever behind the first application, the condition check will always be false. not sure if this is a desirable behavior.

Regarding the Kafka data file committer use case, it is non-desirable to have all the parallel threads committing to the Iceberg table. If the parallelism is 100 or 1,000, there will be a lot of collisions and retries. The conditional commit can ensure the correctness. But it can be inefficient or infeasible with high parallelism. Flink Iceberg sink coalescing all data files to a single committer task so that there is only one committer thread (in a Flink job) committing to the iceberg table.

fqaiser94 commented 1 year ago

Hi @stevenzwu,

Great job presenting at subsurface conference today :) Regarding your comments:

Regarding the Kafka data file committer use case, it is non-desirable to have all the parallel threads committing to the Iceberg table. If the parallelism is 100 or 1,000, there will be a lot of collisions and retries. The conditional commit can ensure the correctness. But it can be inefficient or infeasible with high parallelism. Flink Iceberg sink coalescing all data files to a single committer task so that there is only one committer thread (in a Flink job) committing to the iceberg table.

I fully agree with you on all the points you've stated here. If the parallelism is high, conditional commits will be extremely inefficient to the point of infeasible even though it guarantees "correctness." In general, it is best to have just a single thread committing updates to an iceberg table, which is what Flink Iceberg Sink does. This is also exactly what we try to do in our custom system. Our issue however is that we cannot guarantee that there will always be exactly a single thread committing a given datafile to the iceberg table. In rare situations, and for only a brief moment in time, there may be more than one thread attempting to commit the same set of datafiles to the same iceberg table at the same time. Due to the distributed nature of our datafile-committer application, it is impossible to completely avoid this type of situation ever happening, we can only make them rare. This is where we want to use conditional commits as a last line of defence to ensure that only one of these threads is successful in committing the datafile during these rare and brief moments of instability.

I hope this explanation clarifies why conditional commits is still useful for datafile-committer use-cases like ours where you can guarantee that the majority of the time there will not be any conflicts.

if we go with the conditional commit approach and fail the second commit with lower watermark, how would the second application handle the failure? we can also get into the situation where the second application may never able to commit. if it's watermark is forever behind the first application, the condition check will always be false. not sure if this is a desirable behavior.

The desired behaviour here will likely vary depending on the use-case. For example, for our use-case, the second application which is hit with a failure would abandon trying to commit that set of datafiles and move on to the next set of datafiles. Due to the nature of our application, we're guaranteed to eventually (and quickly) converge to a stable condition where each instance will be left handling messages that cannot conflict.

As far as the iceberg library is concerned, IMO it should just provide the fundamental building block i.e. a way for iceberg-api users to express the conditions under which a commit should proceed, and leave how to handle failures up to the API users.

mahdibh commented 1 year ago

I think one way to approach this problem (which we are also running into) is to expose a new commit API that doesn't automatically refresh the table metadata. This would allow clients to rely on the atomic snapshot CAS to also synchronize the state changes to custom properties (whether it's on the table or the snapshot). This is a very desirable capability since it allows us to update in one single atomic operation both the current snapshot and the properties of that snapshot.

In our particular case, we would like to store some kafka offsets as part of the snapshot properties. Even if we had a single writer, there is always a case where we may end up with two writer (for example due to a network partition) and in that case, we don't want the last writer's state to overwrite the first one. We would like to use the CAS nature of iceberg as the last resort to detect concurrent writes.

Another way to think about this is that each iceberg committer is moving the state of a table from state-A to state-B. Most clients do not care about what state-A is. All they care about is that it gets moved to state-B. So if another client gets in between and moves state-A to state-C, the first client will happily commit by doing the the implicit refresh (which will pick up state-C) and then doing the commit effectively moving the system from state-C to state-B.

There are cases where clients want to be explicit about that state transition. For example, because there are some attributes in state-A that they use to determine what state-B looks like (ie, the watermark example in the original description or kafka offsets in our case).

Another argument for not doing the refresh is that it adds extra latency (due to the retrieval of the current snapshot). In the single writer case, it is not needed since the common case is that there would be no conflicts and the writer always has the latest snapshot.

I think providing the caller the option to decide on whether they care about the original state or not when they do a commit is a very powerful capability. It also seems like a rather straightforward change that can be applied in a backward compatible way.

bryanck commented 3 months ago

@mahdibh I also had the same thought. If we have an option or lower-level API that does not refresh or retry, then the caller can handle failures and retries how it sees fit when the commit's base metadata does not match the current table state. This would allow more control by the caller, and we wouldn't need to pass down conditions.

In the Kafka Connect sink, we could use this API or option to add more protections against having two coordinators active at the same time. For example, one coordinator loads a table to check offsets in the snapshot properties, and they match, so the commit proceeds. Concurrently a second coordinator completes a commit, so the metadata has changed. Now the first coordinator attempts a commit.

SnapshotProducer.apply() currently will refresh the table and base the commit on the refreshed metadata, and this commit should succeed. If we could disable the refresh, and base the commit on the original loaded metadata, then this commit should fail (which is what we want).

@fqaiser94 Did you have any thoughts?

fqaiser94 commented 3 months ago

Apologies @mahdibh for not responding, I suspect I was busy at the time dealing with other things.


@bryanck I did consider this idea in the past and I would be satisfied if we supported such a config option. (Frankly, I would have preferred if iceberg never did refreshes internally and left that as a higher-level, explicit, user-facing concern but that boat has long since sailed).

However, I think there are two main challenges with this approach.

The first is where/how do you expose this only_allow_explicit_refresh (obv bad name, can bikeshed names another day) config option?

  1. On the table doesn't seem right, as this might only be necessary for specific processes e.g. Kafka-Connect and in general, Iceberg philosophy tends to prefer an optimistic concurrency model.
  2. The only other place I can think of to expose such a config option is when you initialize a new Catalog implementation. This seems more appropriate but does mean each Catalog implementation needs to change to take advantage of this config option (we would have to default to false to preserve backwards compatibility). That should be fine IMO.

The second challenge is more of an implementation detail but we need to be able to distinguish between:

  1. "user-refresh calls" i.e. table.refresh (which we do want to allow regardless of only_allow_explicit_refresh setting) and
  2. iceberg-refresh-calls e.g. inside internal PendingUpdate.apply and PendingUpdate.commit methods (which we don't want to allow if only_allow_explicit_refresh == true)

While I'm sure it's do-able, I faintly remember I tried prototyping this ~1.5 years ago and was either extremely dissatisfied with the resulting code or wasn't able to make it work. I can try to take another stab at it as my knowledge of iceberg internals has improved a lot since then XD