apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.22k stars 2.17k forks source link

manifest list missing error after commit failed exception #9406

Open waichee opened 8 months ago

waichee commented 8 months ago

Apache Iceberg version

1.3.1

Query engine

Spark

Please describe the bug 🐞

Setup We use the following spark libraries to write to Iceberg on EMR: org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1 org.apache.iceberg:iceberg-spark-extensions-3.4_2.12:1.3.1

We have setup DynamoDb lock manager with Glue catalog for our iceberg table.

Spark config

      sparkConf
        .set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
        .set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
        .set("spark.sql.catalog.iceberg.lock-impl", "org.apache.iceberg.aws.dynamodb.DynamoDbLockManager")
        .set("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .set("spark.sql.catalog.iceberg.warehouse", warehousePath)
        .set("spark.sql.catalog.iceberg.lock.table", locktableName)
        .set("spark.sql.defaultCatalog","iceberg");

table properties

    Map<String, String> tableOptions = Map.of(
      "provider", CATALOG_PROVIDER,
      "write.parquet.compression-codec", "snappy",
      "write.delete.mode", "copy-on-write",
      "write.update.mode", "merge-on-read",
      "write.merge.mode", "merge-on-read",
      "write.spark.accept-any-schema", "true"
    );

java code to write to Iceberg table

    datafram
        .writeTo(String.format("%s.%s", database, tablename))
        .option("mergeSchema", "true")
        .append();

We have up to 8 concurrent writers writing to the table every 15 mins or so

What happened We noticed the .append write to a table failed with the following exception.

Caused by: org.apache.iceberg.exceptions.CommitFailedException: Cannot commit iceberg.db.table because base metadata location 's3://bucket/iceberg/db/tableName/metadata/metadata1.json' is not same as the current Glue location  's3://bucket/iceberg/db/tableName/metadata/metadata2.json`
    at org.apache.iceberg.aws.glue.GlueTableOperations.checkMetadataLocation(GlueTableOperations.java:272)
    at org.apache.iceberg.aws.glue.GlueTableOperations.doCommit(GlueTableOperations.java:158)

When we restart the job, the app throws 404 not found when looking up the manifest file for the Iceberg table

Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: null (Service: S3, Status Code: 404, Request ID: BGEGZNRNM1YR7MGW, Extended Request ID: qTaJ1r8Ky1vMKOAU5+r4djAQY9r05jGqvFFGLJz9WD6upWROlpIroSEefjGsw2OQI41k/pHKRRI=) (Service: S3, Status Code: 404, Request ID: BGEGZNRNM1YR7MGW)
    at software.amazon.awssdk.services.s3.model.NoSuchKeyException$BuilderImpl.build(NoSuchKeyException.java:126)
    at software.amazon.awssdk.services.s3.model.NoSuchKeyException$BuilderImpl.build(NoSuchKeyException.java:80)
    at software.amazon.awssdk.services.s3.internal.handlers.ExceptionTranslationInterceptor.modifyException(ExceptionTranslationInterceptor.java:63)
    at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.modifyException(ExecutionInterceptorChain.java:202)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.ExceptionReportingUtils.runModifyException(ExceptionReportingUtils.java:54)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.ExceptionReportingUtils.reportFailureToInterceptors(ExceptionReportingUtils.java:38)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:39)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
    at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:198)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
    at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
    at software.amazon.awssdk.services.s3.DefaultS3Client.headObject(DefaultS3Client.java:5495)
    at org.apache.iceberg.aws.s3.BaseS3File.getObjectMetadata(BaseS3File.java:85)
    at org.apache.iceberg.aws.s3.S3InputFile.getLength(S3InputFile.java:77)
    at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
    at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:76)
    at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:36)
    at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:333)
    at org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:241)
    at org.apache.iceberg.ManifestLists.read(ManifestLists.java:45)
    at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:146)
    at org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:172)
    at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:826)
    at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:226)
    at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:376)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:374)
    at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:222)
    at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:84)
    at org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:285)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:422)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:382)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:248)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:360)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:359)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:248)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:123)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:195)
    at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:149)

Inspecting Glue and S3 we see the following:

We are unable to use the table after this without manual rollback of the metadata file. Is there a bug in optimistic locking with Glue catalog in this case? Would appreciate some pointers on what would have caused this

flisboac commented 5 months ago

I've also come across this error. The difference is that, AFAICT, I don't need to rollback the table when that error happens.

Traceback (most recent call last):
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1712706581772_0133/container_1712706581772_0133_01_000001/REDACTED.py", line 679, in _do_run
    self.save_cdc_history(
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1712706581772_0133/container_1712706581772_0133_01_000001/REDACTED.py", line 1010, in save_cdc_history
    df.writeTo(full_table_name).append()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1712706581772_0133/container_1712706581772_0133_01_000001/pyspark.zip/pyspark/sql/readwriter.py", line 2107, in append
    self._jwriter.append()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1712706581772_0133/container_1712706581772_0133_01_000001/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1712706581772_0133/container_1712706581772_0133_01_000001/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1712706581772_0133/container_1712706581772_0133_01_000001/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o174.append.
: org.apache.iceberg.exceptions.CommitFailedException: Cannot commit FULL_TABLE_NAME_REDACTED because base metadata location 's3://REDACTED/metadata/08121-ff854d63-3fff-4d64-b456-aa8d3e8850da.metadata.json' is not same as the current Glue location 's3://REDACTED/metadata/08122-e77b0e4e-aa50-4346-a578-9eb0b917c097.metadata.json'
    at org.apache.iceberg.aws.glue.GlueTableOperations.checkMetadataLocation(GlueTableOperations.java:272)
    at org.apache.iceberg.aws.glue.GlueTableOperations.doCommit(GlueTableOperations.java:158)
    at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:135)
    at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:400)
    at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
    at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
    at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
    at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:374)
    at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:233)
    at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:84)
    at org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:296)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:399)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:225)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:337)
    at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:336)
    at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:113)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:503)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:503)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:479)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:101)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:151)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
    at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:150)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)

Perhaps the lock is not being held for enough time, so mid-write it encounters conflicts with other writers?

This is how my table was created:

CREATE TABLE __REDACTED__ (
    -- Many columns here; two array<string>, the rest are strings, booleans, bigints and timestamps
    target_database_name string,
    target_table_name string,
    generated_at string,
    started_at timestamp
)
USING iceberg
LOCATION 's3://REDACTED'
PARTITIONED BY (month(started_at))
TBLPROPERTIES (
    'table_type'='iceberg',
    'format-version'='2',
    'write.format.default'='parquet',
    'write.parquet.compression-codec'='snappy',
    'write.avro.compression-codec'='snappy',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.delete.mode'='copy-on-write',
    'write.update.mode'='merge-on-read',
    'write.merge.mode'='merge-on-read',
    'write.metadata.metrics.column.started_at'='full',
    'write.metadata.metrics.column.target_table_name'='full',
    'write.metadata.metrics.column.target_table_name'='full',
    'write.metadata.metrics.column.started_at'='full'
    -- More columns are configured as "full" metrics; 9 "full" in total, including the ones above
);

ALTER TABLE inf_datalake_pipelines_pub.cdc_manifest_history WRITE ORDERED BY
    started_at ASC,
    target_database_name ASC,
    target_table_name ASC,
    generated_at ASC;

I'm using EMR Release 7.0.0. Only using what AWS makes available, in terms of Spark, libraries, etc.