apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.46k stars 2.23k forks source link

Flink sink writes duplicate data in upsert mode #10431

Closed zhongqishang closed 2 months ago

zhongqishang commented 5 months ago

Apache Iceberg version

1.2.1

Query engine

Flink 1.14.4

Please describe the bug 🐞

I have a flink upsert job with a checkpoint interval of 5 minutes and an external service periodically(30min) triggers the savepoint, parallelism = 1.

5 files were generated in one checkpoint cycle, including two data files, two eq delete files, and one pos delete file. The 2 data files and 2 eq-delete files contained the same data. When I queried, duplicate data appeared. I think it is because the subsequent eq delete is not associated with the first data file.

Flink TM log

2024-05-31 16:10:57.457 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:10:57.459 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:10:57.462 org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Checkpoint 5765 has been notified as aborted, would not trigger any checkpoint.
2024-05-31 16:13:58.455 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:13:58.505 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]
2024-05-31 16:13:58.507 org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.zstd]

JM log

2024-05-31 16:08:12.840 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 5764 (type=CHECKPOINT) @ 1717142891998 for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:08:16.239 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Triggering savepoint for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:08:16.242 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 5765 (type=SAVEPOINT) @ 1717142896239 for job fc721024df3d70e3a1f3a46a63e9635a.
2024-05-31 16:09:41.531 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 5764 for job fc721024df3d70e3a1f3a46a63e9635a (7170 bytes, checkpointDuration=89495 ms, finalizationTime=38 ms).
2024-05-31 16:09:41.532 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking checkpoint 5764 as completed for source Source: TableSourceScan(table=[[default_catalog, default_database, cdc_xxx]], fields=[id, data_status, ...]).
2024-05-31 16:10:46.242 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 5765 of job fc721024df3d70e3a1f3a46a63e9635a expired before completing.

Downloaded files and sizes:

-rw-r--r--@ 1 q  staff   30528 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01926.parquet
-rw-r--r--@ 1 q  staff     701 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01927.parquet
-rw-r--r--@ 1 q  staff  741706 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01928.parquet
-rw-r--r--@ 1 q  staff   17592 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01929.parquet
-rw-r--r--@ 1 q  staff    1978 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01930.parquet
pvary commented 5 months ago

@zhongqishang: How is your sink/table created? What are the exact records you are sending to the sink? Your issue seems very similar to: https://github.com/apache/iceberg/issues/10076

zhongqishang commented 5 months ago

@zhongqishang: How is your sink/table created? What are the exact records you are sending to the sink? Your issue seems very similar to: #10076

@pvary Thanks for your reply.

The follow is create table sql :

CREATE TABLE `iceberg_table` ( 
  `id` String, 
  ... 
  PRIMARY KEY (`id`) NOT ENFORCED  
) WITH ( 
    'connector'='iceberg',
    'catalog-name'='iceberg_catalog',
    'catalog-database'='database',
    'catalog-table'='table',
    'catalog-type'='hive',
    'uri'='xx',
    'hive-conf-dir'='xx',
    'write.format.default'='parquet',
    'format-version'='2',
    'write.upsert.enabled'='true',
    'write.metadata.metrics.default'='full',
    'write.target-file-size-bytes'='268435456',
    'write.parquet.compression-codec'='zstd',
    ...
);

This is not similar to #10061 , The background of this problem is accompanied by concurrent checkpoint and savepoint, the next savepoint is aborted.

pvary commented 5 months ago

@zhongqishang: Do you see anything more in the logs? Exceptions/retries, or something like this?

Also, I don't fully understand your statement here:

I think it is because the subsequent eq delete is not associated with the first data file.

Could you p lease elaborate a but?

zhongqishang commented 5 months ago

@zhongqishang: Do you see anything more in the logs? Exceptions/retries, or something like this?

I have not found any Exceptions/retries around the wrong snapshot time. To be clear, this is duplicate data that only exists from a certain snapshot onwards.

I think it is because the subsequent eq delete is not associated with the first data file.

Could you p lease elaborate a but?

TM snapshot commit log :

2024-05-31 16:13:58.916 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committing rowDelta for checkpoint 5766 to table iceberg_catalog.ods_iceberg_xxx.xxx branch main with summary: CommitSummary{dataFilesCount=2, dataFilesRecordCount=1016, dataFilesByteCount=772234, deleteFilesCount=3, deleteFilesRecordCount=1016, deleteFilesByteCount=20271}
2024-05-31 16:13:59.254 INFO org.apache.iceberg.metrics.LoggingMetricsReporter            [] - Received metrics report: CommitReport{tableName=iceberg_catalog.ods_iceberg_xxx.xxx, snapshotId=6176224982712390258, sequenceNumber=7987, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.326962754S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=2}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=77}, addedDeleteFiles=CounterResult{unit=COUNT, value=3}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=2}, addedPositionalDeleteFiles=CounterResult{unit=COUNT, value=1}, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=84}, addedRecords=CounterResult{unit=COUNT, value=1016}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=43962360}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=792505}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=16066186371}, addedPositionalDeletes=CounterResult{unit=COUNT, value=50}, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=2796437}, addedEqualityDeletes=CounterResult{unit=COUNT, value=966}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=8596}}, metadata={engine-version=1.14.4, engine-name=flink, iceberg-version=Apache Iceberg 1.2.1 (commit 4e2cdccd7453603af42a090fc5530f2bd20cf1be)}}

The files and data submitted by this error snapshot are:

-rw-r--r--@ 1 q  staff   30528 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01926.parquet
-rw-r--r--@ 1 q  staff     701 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01927.parquet
-rw-r--r--@ 1 q  staff  741706 May 31 19:36 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01928.parquet
-rw-r--r--@ 1 q  staff   17592 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01929.parquet
-rw-r--r--@ 1 q  staff    1978 May 31 19:37 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01930.parquet

DATA file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01926.parquet

{"id": "d6957bf663d220cac89a4e780ab8f54d", ...}

EQ-DELETE file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01927.parquet

{"id": "d6957bf663d220cac89a4e780ab8f54d"}

DATA file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01928.parquet

{"id": "d6957bf663d220cac89a4e780ab8f54d", ...}

~EQ-DELETE file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01930.parquet~ EQ-DELETE file : 00000-1-64548f8b-a8a9-474f-95f8-a59a30dcce2f-01929.parquet

{"id": "d6957bf663d220cac89a4e780ab8f54d"}

I mean eq delete file ~01930~01929 is not associated with data file 01926

pvary commented 5 months ago

Here is how the different deletes work:

Based on the data you have shown, I would guess that there were 2 updates for the given record in the snapshot. Flink does the following:

Based on the commit message you have shown, it tells us, that it has written 3 delete files - I expect that one of the delete files is the positional delete. It should contain the delete record for the given row.

Could you please check it this is the case? Thanks, Peter

zhongqishang commented 5 months ago

@pvary Yes, the 01930 file is a pos delete file, but the file path is only contain the data file 01928.
It should contain the first data update in the first data file 01926, but it does not.

Sorry, I have corrected the wrong file ID provided above.

zhongqishang commented 5 months ago

@pvary I encountered the same problem on another table, this time it was caused by a checkpoint PRC timeout.

JM log

2024-06-07 15:50:10.472 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 86447 (type=CHECKPOINT) @ 1717746610457 for job dd6dc4caf9d34a7d91767420862f4826.
2024-06-07 15:50:20.496 [flink-scheduler-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering Checkpoint 86447 for job dd6dc4caf9d34a7d91767420862f4826 failed due to java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(null.triggerCheckpoint(ExecutionAttemptID, long, long, CheckpointOptions))] at recipient [akka.tcp://flink@hd-097.ld-hadoop.com:36663/user/rpc/taskmanager_0] timed out.
2024-06-07 15:50:32.594 [jobmanager-io-thread-71] WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Received late message for now expired checkpoint attempt 86447 from task a2e42c20a8234511b70c26b835ec5552 of job dd6dc4caf9d34a7d91767420862f4826 at container_e53_1714278166039_374784_01_000004 @ hd-097.ld-hadoop.com (dataPort=33266).
2024-06-07 15:55:10.480 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 86448 (type=CHECKPOINT) @ 1717746910457 for job dd6dc4caf9d34a7d91767420862f4826.
2024-06-07 15:55:13.450 [jobmanager-io-thread-76] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 86448 for job dd6dc4caf9d34a7d91767420862f4826 (7769 bytes, checkpointDuration=2936 ms, finalizationTime=57 ms).
pvary commented 5 months ago

Seems like an issue with checkpoint retry. Will be out of office for a bit, but this needs to be investigated.

pvary commented 5 months ago

@zhongqishang: Seems like an issue with checkpoint retries. Is there any chance to retry the issue with newer version of Flink? The currently supported versions are 1.17, 1.18, 1.19.