pravega / flink-connectors

Apache Flink connectors for Pravega.
Apache License 2.0
96 stars 68 forks source link

Handle expiration of transactions between operator checkpoint and global checkpoint commit #5

Open StephanEwen opened 7 years ago

StephanEwen commented 7 years ago

Problem description

The FlinkExactlyOncePravegaWriter works similar to a 2-PC protocol:

  1. A transaction is created per checkpoint
  2. When Flink does a checkpoint on the writer, it flushes the transaction, and stores its UUID in the checkpoint state (vote-to-commit)
  3. When the checkpoint is complete and the sink receives a notification, that checkpoint's transaction is committed (full commit)

That model assumes that transactions can be committed once the checkpoint's completeness notification is received (step 3). If a transaction times out between step (2) and step (3), there will be data loss.

This is an inherent fragility that seems hard to circumvent with the current primitive, and has to do with transaction timeouts. Given sufficiently long timeouts, this may never be a problem in most setups, but it is not nice to have this weak point in the long run.

Problem location

The problem is in the use of Pravega Transactions in the io.pravega.connectors.flink.FlinkExactlyOncePravegaWriter.

Suggestions for an improvement

From the top of my head, there are three types of solutions to that issue:

  1. Pravega offers a pre-commit / full-commit distinction in transactions, where a pre-commit means the transaction becomes immutable and the usual timeouts do not apply (except possibly a garbage prevention-timeout which could be very long). The full commit publishes the temporary segment.

  2. Flink makes sure it can recover transactions, for example by persisting the data as well in the checkpoints. If a transaction timed out, it can open a new transaction and re-write the data. Disadvantage is that effectively, everything is persisted in two distinct system (each with its own durability/replication).

  3. Flink adds a way that lets tasks complete a checkpoint completely independent of other tasks. Then the transaction could be immediately committed on trigger checkpoint. That would require to guarantee that the sinks would never be affected by a recovery of other tasks. To keep the current consistency guarantees, this would require persisting the result from the input to the sink operation (similar as for example Samza persists every shuffle), which is in some sense not too different from approach (2).

tkaitchuck commented 7 years ago

I addition to all of the above I can imagine several possible solutions:

vijikarthi commented 5 years ago

Pravega offers a pre-commit / full-commit distinction in transactions, where a pre-commit means the transaction becomes immutable and the usual timeouts do not apply (except possibly a garbage prevention-timeout which could be very long). The full commit publishes the temporary segment.

For applications like Flink that guarantees the triggering of txn.commit() once the Pre-Commit phase is successfully complete, I think providing an API option to ignore the transaction timeout could be one of the option to consider to mitigate the issue.

For example, during the pre-commit phase, the writer client could invoke txn.flush(boolean ignoreTimeOut) indicating that txn.commit() will eventually be called and hence the transaction timeout interval can be ignored (or defaults to a much larger value just in-case if the txn needs to be aborted later). There is a also a possibility of the transaction being timed-out while the txn.flush() is being called and in that case, the flush should not happen allowing Flink to restart the job according to the restart strategy option configured in the job.

Going by the current defaults, the default transaction timeout will be 10 Seconds times the multiplier 1000 i.e., 10 seconds * 1000 = 10000 seconds (~2.7 Hours) or a max of 24 Hours (if custom configuration value exceeds the default minimum) according to this calculation. So it is likely that we may not hit the timeout issue but still a possibility.