apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.38k stars 2.2k forks source link

No retries on snapshot commit on eventual consistent file system #1398

Closed davseitsev closed 7 months ago

davseitsev commented 4 years ago

I'm building Spark structured streaming application which writes data to Amazon S3 in Iceberg format. Sometimes the query fails without reties dues to S3 eventual consistency.

20/08/28 10:15:08 ERROR MicroBatchExecution: Query event-streaming-query-76-v0 [id = bea88375-dd55-4eb1-bc4a-44ed9f5fdbf9, runId = 7536644f-1ff8-42fd-b9b7-812fcdab1e21] terminated with error
org.apache.spark.SparkException: Writing job aborted.
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:283)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:332)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
    at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2788)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: s3a://my_bucket/warehouse.db/table_76/metadata/snap-8502691184173814269-1-8b57c8f4-1f87-4d61-9708-d890f251d101.avro
    at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:159)
    at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:95)
    at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
    at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37)
    at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
    at org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
    at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46)
    at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127)
    at org.apache.iceberg.BaseSnapshot.allManifests(BaseSnapshot.java:141)
    at org.apache.iceberg.FastAppend.apply(FastAppend.java:142)
    at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:149)
    at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:262)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:403)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:188)
    at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:261)
    at org.apache.iceberg.spark.source.Writer.commitOperation(Writer.java:149)
    at org.apache.iceberg.spark.source.StreamingWriter.commit(StreamingWriter.java:93)
    at org.apache.iceberg.spark.source.StreamingWriter.commit(StreamingWriter.java:86)
    at org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter.commit(MicroBatchWriter.scala:31)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:76)
    ... 35 more
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://my_bucket/warehouse.db/table_76/metadata/snap-8502691184173814269-1-8b57c8f4-1f87-4d61-9708-d890f251d101.avro
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:790)
    at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:157)
    ... 56 more

As far as I understand it happens in SnapshotProducer during commit operation. FastAppend tries to read manifest list in apply() method and get java.io.FileNotFoundException in S3AFileSystem.getFileStatus(S3AFileSystem.java:1642). But actually file exists.

It's a known issue with S3AFileSystem, it checks whether file exists before creating a file and it breaks read-after-write strong consistency. And when another client want to read newly created file it can get FNF exception. The problem is that SnapshotProducer reties only CommitFailedException and doesn't retry any IOException.

aokolnychyi commented 4 years ago

I remember we had this issue earlier. Here is the PR with some discussion. The idea suggested there was either contribute a custom FileIO implementation in Iceberg or use Hadoop FS that handles it already.

aokolnychyi commented 4 years ago

cc @rdblue

aokolnychyi commented 4 years ago

I feel like we need to agree on the best solution and document it.

aokolnychyi commented 4 years ago

Here is the snippet from create in S3AfileSystem:

    try {
      // get the status or throw an FNFE
      status = getFileStatus(path);

      // if the thread reaches here, there is something at the path
      if (status.isDirectory()) {
        // path references a directory: automatic error
        throw new FileAlreadyExistsException(path + " is a directory");
      }
      if (!overwrite) {
        // path references a file and overwrite is disabled
        throw new FileAlreadyExistsException(path + " already exists");
      }
      LOG.debug("Overwriting file {}", path);
    } catch (FileNotFoundException e) {
      // this means the file is not found

    }

Looks like getFileStatus breaks this logic and even setting the overwrite flag does not help. At least, in S3AFileSystem.

rdblue commented 4 years ago

Our S3 file system doesn't do the getFileStatus check when overwriting, which is how we avoid this. I think the best way to handle this is to add a certain amount of retries at the FileIO layer.

We already have retries for table.refresh() (in BaseMetastoreTableOperations) that take care of most of these issues for the root metadata. The reason why we added the retries there was to avoid adding extra latency when there actually is a problem -- when a data file is missing, for example. But, it looks like it would be better to have retries for at least metadata files when using S3.

I think it makes sense to add retries to HadoopInputFile and to make the number of retries and total retry timeout configurable either in table properties or Hadoop Configuration. The downside to adding retries there is that we'd have retries for data files as well, but that seems like a reasonable trade-off to me.

johnclara commented 3 years ago

Hi @davseitsev, we're also using s3a (version 2.8.4) and I'm wondering if you see this frequently or within a single operation? We're starting to have an issue which we can't identify within a single create() but it sounds like this happens from a create() and then open()?

github-actions[bot] commented 8 months ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 7 months ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'