apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.52k stars 2.25k forks source link

Flink Job failed to restore because of downstream table changed #4137

Closed coolderli closed 2 years ago

coolderli commented 2 years ago

My Flink Job Failed to restore from the checkpoint, and throw the exception as below:

2022-02-15 23:38:13.944 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       - IcebergFilesCommitter -> Sink: IcebergSink iceberg_zjyprc_hadoop.tmp.dws_diagnostice_flink_spark_test_copy (1/1) (521e1e169598446f0353ceafaec499fa) switched from RUNNING to FAILED on container_e36_1638340655551_39490_01_000002 @ zjy-hadoop-prc-streaming165.bj (dataPort=46101).
org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: hdfs://zjyprc-hadoop/user/h_data_platform/datalake/tmp.db/dws_diagnostice_flink_spark_test/metadata/bac976d7f048dd39440ab9155dd5fd67-fbb4ef531e002f8fb3a2052db255adf5-00000-0-12-00011.avro
    at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:177)
    at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101)
    at org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
    at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:103)
    at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:93)
    at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:77)
    at org.apache.iceberg.flink.sink.FlinkManifestUtil.readDataFiles(FlinkManifestUtil.java:59)
    at org.apache.iceberg.flink.sink.FlinkManifestUtil.readCompletedFiles(FlinkManifestUtil.java:106)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:243)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:179)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:325)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File does not exist: /user/h_data_platform/datalake/tmp.db/dws_diagnostice_flink_spark_test/metadata/bac976d7f048dd39440ab9155dd5fd67-fbb4ef531e002f8fb3a2052db255adf5-00000-0-12-00011.avro
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
    at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:155)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2140)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:845)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:489)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:532)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1125)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1053)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1805)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3052)

After the verification, I found the downstream table has changed. In the current implementation, we query the history of snapshots to find the max committed checkpoint-id: https://github.com/apache/iceberg/blob/master/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L369 If the downstream table has changed, the value of getMaxCommittedCheckpointId is unpredictable. I think we can store the table UUID on the checkpoint. When restoring, we can use the table UUID to validate the downstream, throw an exception if the UUIDs are inconsistent. What do you think about this? @stevenzwu @rdblue

stevenzwu commented 2 years ago

@coolderli is my understanding below correct?

Regarding getMaxCommittedCheckpointId, I guess you are saying that the snapshot containing the last commited checkpointId expired and hence getMaxCommittedCheckpointId returns -1. I agree that in this case we shouldn't try to execute commitUpToCheckpoint since we don't safely know what is the last committed checkpoint id.

It can result in duplicates in this case if the manifest file wasn't cleaned up after successful commit. Because the checkpointed manifest file was deleted, we can also conclude that the last Iceberg commit succeeded already.

github-actions[bot] commented 2 years ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 2 years ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

chenwyi2 commented 2 years ago

this problem has been solved? i also meet this problem, when iceberg was commited sucessfully but flink flush snapshot state to state backend was failed, then i restart task, it can be failed: FileNotFoundException: File does not exist