apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.19k stars 2.16k forks source link

flink iceberg may occur duplication when succeed to write datafile and commit but checkpoint fail #10765

Open maekchi opened 1 month ago

maekchi commented 1 month ago

Apache Iceberg version

1.4.3

Query engine

Flink

Please describe the bug 🐞

It seems like very rare duplicates occur in flink iceberg.

Let me explain the situation I experienced:

result

{
    "sequence-number" : 201719,
    "snapshot-id" : 8203882888081487848,
    "parent-snapshot-id" : 7556868946872881546,
    "timestamp-ms" : 1721764676985,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",
      "added-data-files" : "1",
      "added-records" : "17554",
      "added-files-size" : "664840",
      "changed-partition-count" : "1",
      "total-records" : "3966880804",
      "total-files-size" : "241007398466",
      "total-data-files" : "774",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-8203882888081487848-1-354fd0bb-38d9-4706-8483-8a4276888dc3.avro",
    "schema-id" : 2
  }, {
    "sequence-number" : 201720,
    "snapshot-id" : 3289453546560274810,
    "parent-snapshot-id" : 8203882888081487848,
    "timestamp-ms" : 1721764798149,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",
      "added-data-files" : "1",
      "added-records" : "17554",
      "added-files-size" : "664840",
      "changed-partition-count" : "1",
      "total-records" : "3966898358",
      "total-files-size" : "241008063306",
      "total-data-files" : "775",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-3289453546560274810-2-e0983626-a2a5-49f2-988b-dc432f100451.avro",
    "schema-id" : 2
  },

Looking at the situation, the restore was done with the checkpoint ID that was completed up to the checkpoint, and the commit was performed again up to the completed checkpoint. As a result, the commit for checkpoint id 19516 was performed twice, pointing to the same data file.

When I read this in trino, the same data will be read twice and will appear duplicated.

I tried to delete it to resolve duplicate data, but the following error occurred in trino. Found more deleted rows than exist in the file

rewrite_file was also performed

CALL spark_catalog.system.rewrite_data_files(table => 'table', where => 'partition="xxxxxx"');
24/07/24 06:38:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxT
oStringFields'.
24/07/24 06:44:02 WARN ManifestFilterManager: Deleting a duplicate path from manifest hdfs://~~~~~/metadata/f0b56e5f-9b32-48d2-ba77-ddb93081c881-m1.avro: hdfs://~~~~/data/parttiion/00
000-0-01327dd4-6162-483d-b2e0-bb0694402807-00513.parquet
398     1       373764540       0
Time taken: 313.164 seconds, Fetched 1 row(s)

but contrary to the message, data duplication was not actually resolved. and expire snapstot is not worked, too.

Finally, I will try to modify manifest directly.

(Addition) After rewrite file, I found that data can be deleted without error. So i create backup table, and backup other table without duplicated data Then. delete duplicated data in original table and copy from backup table

Is there a better solution in this situation? and please check this situation

Thanks.

Willingness to contribute

pvary commented 1 month ago

Is this similar to #10526?

zhongqishang commented 1 month ago

As a result, the commit for checkpoint id 19516 was performed twice, pointing to the same data file.

~@maekchi I think submitting files repeatedly does not generate duplicate data. Can you check the snapshot information of checkpoint id = 19518. If possible, you can use Time travel to confirm which specific submission caused the duplicate data.~

pvary commented 1 month ago

I have taken another look at the info you have provided. This is definitely different than #10526 as this is append only change.

Do you have any more detailed log, or any way to repro the case?

As you highlighted there are 2 commits with the same data:

      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",

This should be prevented by the maxCommittedCheckpointId check. See: https://github.com/apache/iceberg/blob/apache-iceberg-1.4.3/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L189

maekchi commented 1 month ago

@pvary I also think this is different than #10526. I am only appending data. and perform with exactly once mode in Flink.

Because there were other Flink apps, there were too many logs, so I am attaching additional logs by filtering them by table name among all logs.

Jul 24, 2024 @ 04:57:32.562 Committing append for checkpoint 19516 to table hive.table branch main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=17554, dataFilesByteCount=664840, deleteFilesCount=0, deleteFilesRecordCount=0, deleteFilesByteCount=0}   90  task_manager
Jul 24, 2024 @ 04:57:32.562 Start to flush snapshot state to state backend, table: hive.table, checkpointId: 19516  90  task_manager
Jul 24, 2024 @ 04:59:00.153 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (a035c92a2f05258b27a3986798fcde66) switched from CANCELING to CANCELED. 25,369  job_manager
Jul 24, 2024 @ 04:59:00.153 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (eb7fcaa1da87cd760c8e489b36528ac7) switched from CREATED to SCHEDULED.  25,371  job_manager
Jul 24, 2024 @ 04:59:00.153 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (a035c92a2f05258b27a3986798fcde66) switched from RUNNING to CANCELING.  25,369  job_manager
Jul 24, 2024 @ 04:59:19.156 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (eb7fcaa1da87cd760c8e489b36528ac7) switched from SCHEDULED to DEPLOYING.    25,360  job_manager
Jul 24, 2024 @ 04:59:19.156 Deploying freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (attempt #1) with attempt id eb7fcaa1da87cd760c8e489b36528ac7 and vertex id 9135501d46e54bf84710f477c1eb5f38_0 to 172.24.71.24:6122-16ff02 @ 172.24.71.24 (dataPort=45067) with allocation id fa75b71da4f9b96484bc5435b5f55b00    25,360  job_manager
Jul 24, 2024 @ 04:59:20.156 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (eb7fcaa1da87cd760c8e489b36528ac7) switched from DEPLOYING to INITIALIZING. 25,366  job_manager
Jul 24, 2024 @ 04:59:20.780 Loading JAR files for task freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7) [DEPLOYING].    3,208   task_manager
Jul 24, 2024 @ 04:59:20.780 Received task freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7), deploy into slot with allocation id fa75b71da4f9b96484bc5435b5f55b00.   3,196   task_manager
Jul 24, 2024 @ 04:59:20.780 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7) switched from CREATED to DEPLOYING.    3,208   task_manager
Jul 24, 2024 @ 04:59:20.781 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7) switched from DEPLOYING to INITIALIZING.   3,208   task_manager
Jul 24, 2024 @ 04:59:22.783 Table loaded by catalog: hive.table 3,208   task_manager
Jul 24, 2024 @ 04:59:22.783 Committing append for checkpoint 19516 to table hive.table branch main with summary: CommitSummary{dataFilesCount=1, dataFilesRecordCount=17554, dataFilesByteCount=664840, deleteFilesCount=0, deleteFilesRecordCount=0, deleteFilesByteCount=0}   3,208   task_manager
Jul 24, 2024 @ 04:59:45.158 Failed to trigger checkpoint for job ba65ea243c487f4f0fd52c158e4ed985 since Checkpoint triggering task freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) of job ba65ea243c487f4f0fd52c158e4ed985 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.. 173 job_manager
Jul 24, 2024 @ 04:59:59.159 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) (eb7fcaa1da87cd760c8e489b36528ac7) switched from INITIALIZING to RUNNING.   25,397  job_manager
Jul 24, 2024 @ 05:00:00.788 freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1)#1 (eb7fcaa1da87cd760c8e489b36528ac7) switched from INITIALIZING to RUNNING. 3,208   task_manager
Jul 24, 2024 @ 05:00:00.788 Successfully committed to table hive.table in 130 ms    3,208   task_manager
Jul 24, 2024 @ 05:00:00.788 Committed to table hive.table with the new metadata location hdfs://~~~~~/metadata/202296-54e38fd0-c3c5-4431-b072-962402eedb7b.metadata.json    3,208   task_manager
Jul 24, 2024 @ 05:00:00.788 Committed append to table: hive.table, branch: main, checkpointId 19516 in 36033 ms 3,208   task_manager
Jul 24, 2024 @ 05:00:00.788 Received metrics report: CommitReport{tableName=hive.table, snapshotId=3289453546560274810, sequenceNumber=201720, operation=append, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT36.012583152S, count=1}, attempts=CounterResult{unit=COUNT, value=2}, addedDataFiles=CounterResult{unit=COUNT, value=1}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=775}, addedDeleteFiles=null, addedEqualityDeleteFiles=null, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=2}, addedRecords=CounterResult{unit=COUNT, value=17554}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=3966898358}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=664840}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=241008063306}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=18608}, addedEqualityDeletes=null, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=1.15.2, engine-name=flink, iceberg-version=Apache Iceberg 1.4.3 (commit 9a5d24fee239352021a9a73f6a4cad8ecf464f01)}}    3,208   task_manager
Jul 24, 2024 @ 05:00:09.567 Committed to table hive.table with the new metadata location hdfs://~~~~~/metadata/202295-d906b795-214d-46a6-b2dd-5353f134a9a6.metadata.json    90  task_manager
Jul 24, 2024 @ 05:00:45.789 Committed append to table: hive.table, branch: main, checkpointId 19518 in 1112 ms  3,208   task_manager
Jul 24, 2024 @ 05:00:45.789 Table loaded by catalog: hive.table 3,208   task_manager
Jul 24, 2024 @ 05:00:45.789 Start to flush snapshot state to state backend, table: hive.table, checkpointId: 19518  3,208   task_manager

Checking the log, it seems that there is a "commiting" log at 04:57, but there is no "commited" log for this.

pvary commented 1 month ago

Which version of Flink do you using btw?

I see this in the logs Committing append for checkpoint 19516 - this is started, but can't see the corresponding Committed {} to table: {}, branch: {}, checkpointId {} in {} ms

This means that Flink doesn't know if 19516 (snapshot 8203882888081487848) was successful or not. Let's assume it was successful behind the scenes.

So when it recovers it will find the metadata in the state for checkpoint 19516.

So we have to check the recovery codepath to understand what is happening. https://github.com/apache/iceberg/blob/apache-iceberg-1.4.3/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L144

maxCommittedCheckpointId (https://github.com/apache/iceberg/blob/apache-iceberg-1.4.3/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L493) should return 19516 if the previous commit was successful, and the recovery code should prevent it to be committed again.

What Catalog are you using? Is there any cache, or something which might return wrong data for the table?

Thanks, Peter

maekchi commented 1 month ago

@pvary

Which version of Flink do you using btw?

I use flink version 1.15.4

What Catalog are you using? Is there any cache, or something which might return wrong data for the table?

We use hive catalog, and set up 'engine.hive.lock-enabled=true' and no cache and no wrong data. (we are filtering every wrong data)


By the way, there seems to be one more strange thing in the log.

After processing the checkpointId for 19516, there should be a log somewhere for processing the 19517 checkpoint ID, but there is no log at all.

Because our system performs checkpoints every minute, the checkpoint for 19517 should be performed around 04:58:32 after the 19516 checkpoint performed at 04:57:32.

However, there is no related log. This is because the taskmanager was shut down around that time.

However, an attempt was made for 19516 at 04:59:22, and if you look at the log at 05:00:00, the 19516 snapshot was successfully committed. But there is no log for 19517 anywhere.

There is no record for 19517 even when looking at metadata.json! Is it normal situation during recovery?

{
    "sequence-number" : 201719,
    "snapshot-id" : 8203882888081487848,
    "parent-snapshot-id" : 7556868946872881546,
    "timestamp-ms" : 1721764676985,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",
      "added-data-files" : "1",
      "added-records" : "17554",
      "added-files-size" : "664840",
      "changed-partition-count" : "1",
      "total-records" : "3966880804",
      "total-files-size" : "241007398466",
      "total-data-files" : "774",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-8203882888081487848-1-354fd0bb-38d9-4706-8483-8a4276888dc3.avro",
    "schema-id" : 2
  }, {
    "sequence-number" : 201720,
    "snapshot-id" : 3289453546560274810,
    "parent-snapshot-id" : 8203882888081487848,
    "timestamp-ms" : 1721764798149,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19516",
      "added-data-files" : "1",
      "added-records" : "17554",
      "added-files-size" : "664840",
      "changed-partition-count" : "1",
      "total-records" : "3966898358",
      "total-files-size" : "241008063306",
      "total-data-files" : "775",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-3289453546560274810-2-e0983626-a2a5-49f2-988b-dc432f100451.avro",
    "schema-id" : 2
  }, {
    "sequence-number" : 201721,
    "snapshot-id" : 3232659717465048464,
    "parent-snapshot-id" : 3289453546560274810,
    "timestamp-ms" : 1721764843143,
    "summary" : {
      "operation" : "append",
      "flink.operator-id" : "9135501d46e54bf84710f477c1eb5f38",
      "flink.job-id" : "ba65ea243c487f4f0fd52c158e4ed985",
      "flink.max-committed-checkpoint-id" : "19518",
      "added-data-files" : "1",
      "added-records" : "56759",
      "added-files-size" : "2237712",
      "changed-partition-count" : "1",
      "total-records" : "3966955117",
      "total-files-size" : "241010301018",
      "total-data-files" : "776",
      "total-delete-files" : "2",
      "total-position-deletes" : "18608",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://~~~~~/metadata/snap-3232659717465048464-1-8c5a3ab7-9303-45e5-910c-41d47be08142.avro",
    "schema-id" : 2
  },

Thanks!

pvary commented 1 month ago

I think this is the log for the 19517:

Jul 24, 2024 @ 04:59:45.158 Failed to trigger checkpoint for job ba65ea243c487f4f0fd52c158e4ed985 since Checkpoint triggering task freezer-IcebergFilesCommitter -> Sink: freezer-IcebergSink hive.table (1/1) of job ba65ea243c487f4f0fd52c158e4ed985 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running..

The job was recorvering, and not all of the operators were running, so checkpoint 19517 failed.