apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.47k stars 2.24k forks source link

flink programs sometimes fail to write to icebergTable. The.avro file in metadata cannot be found #9168

Open lpy148145 opened 11 months ago

lpy148145 commented 11 months ago

Apache Iceberg version

1.2.1

Query engine

Flink

Please describe the bug 🐞

2023-11-28 14:11:07,576 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - IcebergFilesCommitter -> Sink: IcebergSink ma_catalog.mz_analytics.ods_ma_base (1/1) (4922d9325c1b9321cc5ef022e8f415f8_6cc2a8ad7e05812a47c2df38c04d53fd_0_2) switched from INITIALIZING to FAILED on container_e21_1658483602855_120032_01_000002 @ bj-admonitor-22 (dataPort=25316). org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: hdfs://nn-omi/omi/mz_analytics/data/warehouse/mz_analytics/ods_ma_base/metadata/snap-9211943791592287069-1-f37a6485-3806-4ff8-b53c-fd063b5a5bf8.avro at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:185) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:76) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:36) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:337) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:241) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.ManifestLists.read(ManifestLists.java:45) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:148) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:174) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:980) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:216) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:365) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:363) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:407) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:358) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitPendingResult(IcebergFilesCommitter.java:289) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:271) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:193) ~[ma_jingni-flink-1.2.10.jar:?] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_112] Caused by: java.io.FileNotFoundException: File does not exist: /omi/mz_analytics/data/warehouse/mz_analytics/ods_ma_base/metadata/snap-9211943791592287069-1-f37a6485-3806-4ff8-b53c-fd063b5a5bf8.avro at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76) at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:153) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1927) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:738) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:426) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_112]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_112]
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:845) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183) ~[ma_jingni-flink-1.2.10.jar:?]
... 33 more
pvary commented 11 months ago

Is there any other process which might remove these files?

Based on the exception, you got this during a Flink Job restart. Either there was some issue which caused the job to restart - in this case you should try to find the error which caused the restart, or the job was down for a while, and some other process removed files from the table directory in the meantime - in this case the HDFS log could help you find the process removing the files

Zhangg7723 commented 11 months ago

same problem https://github.com/apache/iceberg/issues/6066

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.