apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.23k stars 2.17k forks source link

Restoring the Flink streaming job from and older checkpoint/savepoint might trigger a silent data loss #10892

Open lkokhreidze opened 1 month ago

lkokhreidze commented 1 month ago

Apache Iceberg version

1.5.2

Query engine

Flink

Please describe the bug 🐞

When Flink's state is restored, Iceberg File Committer gets the max committed checkpoint ID from the table's metadata as seen here. When recovering the job from the older savepoint, this behaviour might cause a silent data loss for Kafka Source (the one we have observed). Consider the following case:

We have Kafka topic which is materialised as Iceberg table with checkpoint interval of 30 seconds. We create job savepoints twice per day, so we could rollback the job to older state if something bad happens. Snapshots for the Iceberg table is kept for the last 2 days.

If during the normal operation the job is at checkpoint 10000, if we would need to restore the job to the yesterdays version from the savepoint, then checkpoint would be rolled back to 9000 (as an example). Savepoint also contains offsets of the Kafka source which were committed when savepoint was triggered. Upon recovery from the savepoint, Kafka source seeks the topic-partition offset based on committed offset in the savepoint. Iceberg File Committer though, upon recovery, would still get the max checkpoint ID as 10000 from table's snapshot metadata and it would skip committing any files to iceberg table due to this check. The problem is there's no guarantee that Kafka source will consume exact same amount of data as before. So it might consume lag faster now due to increased parallelism, more CPU resources, etc. Therefore in just 500 checkpoints it might consume the whole topic lag, while Iceberg File Committer would skip the committing anything new to the table since ongoing checkpoint ID will still be less then max checkpoint stored in the table metadata. Flink itself would notify Kafka Source the checkpoint is successful and commit the new offsets to Kafka, therefore Kafka offsets will advance without writing anything new to the Iceberg table (hence data loss).

Willingness to contribute

lkokhreidze commented 1 month ago

I hope the explanation makes sense and I am not thinking about it in a completely wrong way. If this makes sense, I was wondering, if it would be possible to actually rollback the table snapshot to the version that we get from the Flink's checkpoint/savepoint upon recovery... I realize there might be some reason why it's not done like that, but for me it seems logical thing to do?

lkokhreidze commented 1 month ago

This behaviour might also explain the issue reported here.

pvary commented 1 month ago

Maybe, when restoring state, we could collect commits created by this job/operator, but with newer checkpointId, and we could create a "revert" commit for them...

lkokhreidze commented 1 month ago

Thanks for the reply @pvary ! Just for my understanding - is there technical reason why we can't call ManageSnapshots#setCurrentSnapshot API when recovery happens?

Flow could be something like this:

  1. When job state is restored, get the snapshot ID from the Flink state.
  2. If current snapshot id of the table and one restored from the flink state is the same, do nothing, all seems good.
  3. If snapshot ID from the Flink's state is different, do the following:
    • Get history of the snapshots and make sure that we still have that snapshot id in history, if not fail the recovery with explicit error msg.
    • If snapshot ID is available in the history, use ManageSnapshots#setCurrentSnapshot API to set the current snapshot of the table.

Maybe functionality could be hidden under some config flag.

Apologies if I'm asking an obvious question here, still new to Iceberg with Flink internals.

pvary commented 1 month ago

For the 3rd point:

So if there is a failure between the 2 commit phases, it could happen that the data is available in the temp manifest file, but not yet committed to the Iceberg table. This should be considered before throwing an error.

But if the current snapshot of the Iceberg table is newer than the checkpoint we restore from, then it would be fine to revert to the given snapshot - we need to throw an exception if there was any concurrent writes to the table in the meantime (some other writers might written data independently).

Also we should examine what we can do with the new IcebergSink. @rodmeneses could you please chime in?

stevenzwu commented 1 month ago

it would skip committing any files to iceberg table due to this check.

this is indeed a problem.

If this makes sense, I was wondering, if it would be possible to actually rollback the table snapshot to the version that we get from the Flink's checkpoint/savepoint upon recovery.

this is a logical thinking. but it may not be always safe and has some other implications to downstream.

I prefer this type of Iceberg table state manipulation as a manual admin action (instead of automatic from the Flink writer job).

stevenzwu commented 1 month ago

Let's understand the motivation of maxCommittedCheckpointId, it was used to check if there are any unsuccessful commits from restored checkpoint state.

let's also clarify what should be the expected behavior. When users rewind the job from an older checkpoint/savepoint to reprocess older data, it indicates users are accepting duplicates.

One potential fix is to reset the maxCommittedCheckpointId to the restored checkpointId after the check. Any uncommitted attempts (before or at the restored checkpointId) should be committed as usual. Any uncommitted attempts after the restored checkpointId should be discarded.

Let's assume the previous job completed checkpoint 10. But checkpoint 10 have commit failures (say network issue with the catalog service). So the maxCommittedCheckpointId is 9. The job was rewinded to checkpoint 6.

after checking what are the committed checkpoints, committer should reset maxCommittedCheckpointId to the restored checkpointId 6. Uncommitted checkpoints 10 from previous job should be discarded.

If we are looking at the table snapshot history. we should see this order (from earliest to latest) regarding the committed Flink checkpoints.

..., 5, 6, 7, 8, 9, (job rewinded), 7, 8, ...

It won't be a monotonically increased checkpointIds anymore. It should be fine for the getMaxCommittedCheckpointId method, as the backtracking also checks Flink jobId.

stevenzwu commented 1 month ago

I would also share how I implemented the Flink Iceberg sink at my previous company (Netflix) before the OSS implementation here. We generate UUID for each checkpoint entry/commit which are added to snapshot metadata (similar to checkpointId). During state restore, committer use the UUIDs to check if the restored checkpoints/attempts have been committed or not. There won't be maxCommittedCheckpointId in this case.

pvary commented 1 month ago

The main question is whether we think that the Iceberg table state is part of the Flink state, or not. How do other sinks behave with this regard?

I have seen requests to help me rewind the state of the Iceberg table to the given savepoint, when rolling back was needed. a We needed to read the state with custom java code to extract the checkpointId from the given state. This was hard for such a simple goal. Extracting an UUID would be even harder.

stevenzwu commented 1 month ago

whether we think that the Iceberg table state is part of the Flink state, or not.

I would say no. multiple jobs writing to the same table is a good example.

I have seen requests to help me rewind the state of the Iceberg table to the given savepoint, when rolling back was needed. We needed to read the state with custom java code to extract the checkpointId from the given state. This was hard for such a simple goal. Extracting an UUID would be even harder.

that is a good example where checkpointId is easier than UUID.

BTW, this can be done by snapshots metadata table query with a filter on the summary column, right?

pvary commented 1 month ago

I would say no. multiple jobs writing to the same table is a good example.

Theoretically it is possible to revert only the changes done by the given job (for append only tables). Still not sure we have to do so, but we could.

BTW, this can be done by snapshots metadata table query with a filter on the summary column, right?

That's the easy part. Harder to find the correct checkpointId, operatorId, jobId - this needs state parsing

pvary commented 1 month ago

Discussed the issue with @stevenzwu yesterday in detail.

We agreed, that we should separate out the problems above.

  1. We need to prevent the sink to silently not commit data when there is already one committed to the table with the same checkpointId - this is only desired when restoring the state (as we can't be sure where the previous deployment failed). In every other case, when there are new incoming records, they need to be committed to the Iceberg table.
  2. We need to find a way for the users to easily (manually or automatically) revert back changes done by the given job to the state what was created by the given checkpoint/savepoint.

While in this specific case 1. and 2. was correlated, there might be cases when the desired behaviour is different, so we should not tie the 2 problems together.

lkokhreidze commented 1 month ago

@pvary I'd be happy to help if I can be helpful. Can you maybe expand on the 1st point? How do you envision it being implemented? Is it just as simple as when recovery is detected, remove the max checkpoint ID check and let Iceberg committer commit the data?

pvary commented 1 month ago

@pvary I'd be happy to help if I can be helpful. Can you maybe expand on the 1st point? How do you envision it being implemented? Is it just as simple as when recovery is detected, remove the max checkpoint ID check and let Iceberg committer commit the data?

We will still need to have maxCommittedCheckpointId in initializeState to prevent recommitting data to the Iceberg table on state restore, but in the notifyCheckpointComplete we need to have a different check. We need to correlate the current deployment and the last commit created by this deployment - we have to be aware of the lack of guarantees on the CheckpointListener/notifyCheckpointComplete

rodmeneses commented 1 month ago

For the 3rd point:

  • Flink uses two phase commits. In the 1st phase the data is written to a temp manifest file, and the file path is stored into the state.

So if there is a failure between the 2 commit phases, it could happen that the data is available in the temp manifest file, but not yet committed to the Iceberg table. This should be considered before throwing an error.

But if the current snapshot of the Iceberg table is newer than the checkpoint we restore from, then it would be fine to revert to the given snapshot - we need to throw an exception if there was any concurrent writes to the table in the meantime (some other writers might written data independently).

Also we should examine what we can do with the new IcebergSink. @rodmeneses could you please chime in?

Both the current FlinkSink and the incoming IcebergSink shares the same logic for getMaxCommittedCheckpointId. Both of them are also skipping checkpoints before this maxCheckpointId. In the new IcebergSink, we are also calling signalAlreadyCommitted for each checkpoint that is being skipped. So I'd say that the same issue will also impact the new IcebergSink. @lkokhreidze Thanks for reporting this and volunteering to work in a fix. I'd say the first thing would be to create unit test to reproduce this issue. Once you have them, kinda let me know and I can port them over to the new IcebergSink implementation. cc: @pvary @stevenzwu