apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.51k stars 2.25k forks source link

flink restore failed with filenotfound #6066

Open chenwyi2 opened 2 years ago

chenwyi2 commented 2 years ago

Apache Iceberg version

0.14.1 (latest release)

Query engine

Flink

Please describe the bug 🐞

We have a flink job that write upsert stream into a partitioned icebergV2 table . When that job get failed, we restart it from the latest checkPoint. But we got that exception: Files does not exists.FileNotFoundException: File does not exist: /rbf/warehouse/cupid_bi.db/ads_qixiao_olap_1min/metadata/c918a379b3cc15d7a8193cf27eb8b473-00000-1-38851-10287.avro and i saw task.log 2022-10-21 16:57:19,188 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committing append with 2 data files and 0 delete files to table icebergCatalog.xxx 2022-10-21 16:57:19,573 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Successfully committed to table icebergCatalog.xx 2022-10-21 16:57:19,573 INFO org.apache.iceberg.SnapshotProducer [] - Committed snapshot 7536746147835307981 (MergeAppend) 2022-10-21 16:57:19,594 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Refreshing table metadata from new version: qbfs://online01/warehouse/xx/metadata/99171-45858db6-5917-4397-afcc-76c10ea80305.metadata.json 2022-10-21 16:57:19,750 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed in 562 ms without flushing snapshot state to state backend the reason is metadata checkpoint information is diffrent from snapshotstate? since the jobfailed without flush snapshot state to state backend

stevenzwu commented 2 years ago

.avro might be a manifest file. do you have the complete stack trace? Which Flink version?

I couldn't find this log line in 1.13 (or 1.14 and 1.15).

2022-10-21 16:57:19,750 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed in 562 ms
without flushing snapshot state to state backend

1.13 has log line without the part after Committed in 562 ms. https://github.com/apache/iceberg/blob/master/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java

chenwyi2 commented 2 years ago

.avro might be a manifest file. do you have the complete stack trace? Which Flink version?

I couldn't find this log line in 1.13 (or 1.14 and 1.15).

2022-10-21 16:57:19,750 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed in 562 ms
without flushing snapshot state to state backend

1.13 has log line without the part after Committed in 562 ms. https://github.com/apache/iceberg/blob/master/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java

it is my mistake, the right log should be "2022-10-21 16:57:19,188 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committing append with 2 data files and 0 delete files to table icebergCatalog.xxx 2022-10-21 16:57:19,573 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Successfully committed to table icebergCatalog.xx 2022-10-21 16:57:19,573 INFO org.apache.iceberg.SnapshotProducer [] - Committed snapshot 7536746147835307981 (MergeAppend) 2022-10-21 16:57:19,594 INFO org.apache.iceberg.BaseMetastoreTableOperations [] - Refreshing table metadata from new version: qbfs://online01/warehouse/xx/metadata/99171-45858db6-5917-4397-afcc-76c10ea80305.metadata.json 2022-10-21 16:57:19,750 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Committed in 562 ms 2022-10-21 16:57:20,090 INFO org.apache.iceberg.flink.sink.IcebergFilesCommitter [] - Start to flush snapshot state to state backend, table: icebergCatalog.cupid_bi.ads_qixiao_tracking_hawkeye_no_filter_1min, checkpointId: 8227" but in my situation, the task existed without showing flushing snapshot state to state backend, because of nodemanager restart, then i restart fllink job, the job failed with FileNotFoundException in the hdfs audit log, i never saw that avro file has been created.

stevenzwu commented 2 years ago

Start to flush snapshot state to state backend

This happens in IcebergFilesCommitter#snapshotState. if checkpoint N didn't complete successfully, the written manifest file for the incomplete checkpoint won't be used because last completed checkpoint is N-1.

congd123 commented 1 year ago

@stevenzwu if this scenario happens

if checkpoint N didn't complete successfully, the written manifest file for the incomplete checkpoint won't be used because last completed checkpoint is N-1.

What is the best approach to recover the job?

I have a similar behavior using Flink 1.14.1 and Iceberg 1.0.0 (V2)

jad-grepr commented 4 months ago

Hi! We're running into this issue with Iceberg 1.5.2 and Flink 1.18.1. Seems like it's not yet fixed. We're happy to dedicate resources to fix it if we could get a pointer on resolving it. @stevenzwu would you please point us in the right direction? Thanks!

rahulkr25 commented 6 days ago
  1. Do we have any update here, whether we have planned the resolution for this bug? @jad-grepr @stevenzwu
  2. Secondly, as @congd123 mentioned, What is the best approach for recovery here? as Once i start seeing these errors -> the table is now inaccessible i.e., neither i am able to do any operations through flink nor through trino

Facing issue with version Flink 1.16 and Iceberg 1.3.1, as issue reported by @jad-grepr the issue is coming in latest version also Attaching StackTrace

{"flowId":"37d1732c-20d4-49b4-99b3-67d2ca10e492","id":"6","type":"Error","time":{"sec":1731996384,"micros":358067},"title":"exception while executing job","level":"ERROR","attributes":{"stackTrace":"org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: s3a://<>/<>/<>.events/metadata/12454-c7fe536d-cdd8-47b4-93d7-a6b1d915ef4c.metadata.json\n\tat org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:185)\n\tat org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:272)\n\tat org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:266)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$0(BaseMetastoreTableOperations.java:189)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$1(BaseMetastoreTableOperations.java:208)\n\tat org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)\n\tat org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)\n\tat org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)\n\tat org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:208)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:185)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:180)\n\tat org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:178)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97)\n\tat org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80)\n\tat org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47)\n\tat org.apache.iceberg.catalog.Catalog.tableExists(Catalog.java:281)\n\tat in.zeta.perseus.iceberg.operator.sink.IcebergTableLoader.createTable(IcebergTableLoader.java:107)\n\tat in.zeta.perseus.iceberg.operator.sink.IcebergTableLoader.createIcebergTableIfNotExists(IcebergTableLoader.java:54)\n\tat in.zeta.perseus.iceberg.operator.sink.IcebergTableLoader.getTableLoader(IcebergTableLoader.java:31)\n\tat in.zeta.perseus.iceberg.operator.sink.IcebergSinkOperator.sinkStream(IcebergSinkOperator.java:40)\n\tat in.zeta.perseus.core.job.JobBuilder.sinkStream(JobBuilder.java:225)\n\tat in.zeta.perseus.core.job.JobBuilder.lambda$buildSinkOperator$2(JobBuilder.java:182)\n\tat java.base/java.util.Optional.ifPresent(Unknown Source)\n\tat in.zeta.perseus.core.job.JobBuilder.buildSinkOperator(JobBuilder.java:180)\n\tat in.zeta.perseus.core.job.JobBuilder.executeJobGraphTopologically(JobBuilder.java:111)\n\tat in.zeta.perseus.core.job.JobBuilder.buildAndExecuteJobGraph(JobBuilder.java:69)\n\tat in.zeta.perseus.core.job.JobBuilder.buildJob(JobBuilder.java:56)\n\tat in.zeta.perseus.core.JobDriver.validateAndBuildJob(JobDriver.java:44)\n\tat in.zeta.perseus.core.JobDriver.main(JobDriver.java:25)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\n\tat java.base/java.lang.reflect.Method.invoke(Unknown Source)\n\tat org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)\n\tat org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)\n\tat org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)\n\tat org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:107)\n\tat java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: java.io.FileNotFoundException: No such file or directory: s3a://<>/<>/<>/metadata/12454-c7fe536d-cdd8-47b4-93d7-a6b1d915ef4c.metadata.json\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5401)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1465)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1441)\n\tat org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)\n\tat org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183)\n\t... 41 more\n","message":"Failed to open input stream for file: s3a://<>/<>/<>_events