hansetag / iceberg-catalog

A Rust implementation of the Iceberg REST Catalog specification.
Apache License 2.0
144 stars 9 forks source link

support purge argument on drop #227

Closed twuebi closed 4 days ago

twuebi commented 1 month ago

via spark, DROP ... PURGE is currently failing with:

Py4JJavaError: An error occurred while calling o44.sql.
: org.apache.iceberg.exceptions.NotAuthorizedException: Not authorized: Unauthorized
    at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:210)
    at org.apache.iceberg.rest.ErrorHandlers$DefaultErrorHandler.accept(ErrorHandlers.java:188)
{"timestamp":"2024-08-09T08:35:41.444018Z","level":"DEBUG","message":"finished processing request","latency":"4 ms","status":204,"target":"tower_http::trace::on_response","filename":"/cargo/registry/src/index.crates.io-6f17d22bba15001f/tower-http-0.5.2/src/trace/on_response.rs","line_number":114,"span":{"method":"DELETE","request_id":"01913647-80ff-7810-a3b3-63e76cc87dad","uri":"/catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/namespaces/spark_demo/tables/my_table","version":"HTTP/1.1","name":"request"},"spans":[{"method":"DELETE","request_id":"01913647-80ff-7810-a3b3-63e76cc87dad","uri":"/catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/namespaces/spark_demo/tables/my_table","version":"HTTP/1.1","name":"request"}]}
{"timestamp":"2024-08-09T08:35:41.448217Z","level":"DEBUG","message":"started processing request","target":"tower_http::trace::on_request","filename":"/cargo/registry/src/index.crates.io-6f17d22bba15001f/tower-http-0.5.2/src/trace/on_request.rs","line_number":80,"span":{"method":"POST","request_id":"01913647-8108-73a2-8703-8a6328e66229","uri":"/catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/v1/aws/s3/sign","version":"HTTP/1.1","name":"request"},"spans":[{"method":"POST","request_id":"01913647-8108-73a2-8703-8a6328e66229","uri":"/catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/v1/aws/s3/sign","version":"HTTP/1.1","name":"request"}]}
{"timestamp":"2024-08-09T08:35:41.448629Z","level":"INFO","message":"Error response","error_id":"01913647-8108-73a2-8703-8a8d4a28f411","stack_s":"Unauthorized\nTable not found\n\n","details":"[]","message":"Unauthorized","r#type":"InvalidLocation","code":"401","target":"iceberg_ext::catalog::rest::error","filename":"crates/iceberg-ext/src/catalog/rest/error.rs","line_number":242,"span":{"method":"POST","request_id":"01913647-8108-73a2-8703-8a6328e66229","uri":"/catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/v1/aws/s3/sign","version":"HTTP/1.1","name":"request"},"spans":[{"method":"POST","request_id":"01913647-8108-73a2-8703-8a6328e66229","uri":"/catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/v1/aws/s3/sign","version":"HTTP/1.1","name":"request"}]}
{"timestamp":"2024-08-09T08:35:41.448661Z","level":"DEBUG","message":"finished processing request","latency":"0 ms","status":401,"target":"tower_http::trace::on_response","filename":"/cargo/registry/src/index.crates.io-6f17d22bba15001f/tower-http-0.5.2/src/trace/on_response.rs","line_number":114,"span":{"method":"POST","request_id":"01913647-8108-73a2-8703-8a6328e66229","uri":"/catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/v1/aws/s3/sign","version":"HTTP/1.1","name":"request"},"spans":[{"method":"POST","request_id":"01913647-8108-73a2-8703-8a6328e66229","uri":"/catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/v1/aws/s3/sign","version":"HTTP/1.1","name":"request"}]}
twuebi commented 1 month ago

It seems that PURGE does not pass the requestPurge query parameter, on DROP ... PURGE it does:

  1. GET /catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/namespaces/spark_demo/tables/my_table?snapshots=all
  2. DELETE /catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/namespaces/spark_demo/tables/my_table
  3. POST /catalog/v1/62f65c5c-5011-11ef-b2fd-a31c9faeb0d2/v1/aws/s3/sign

Since we drop the table from postgres on receiving the DELETE, the subsequent sign request fails since C::get_table_metadata_by_s3_location won't be able to find the table we dropped previously.

twuebi commented 1 month ago

The POST to sign tries attempts to perform a lookup against the metadata file, within spark iceberg it's failing at:

  at software.amazon.awssdk.services.s3.DefaultS3Client.headObject(DefaultS3Client.java:6319)
  at org.apache.iceberg.aws.s3.BaseS3File.getObjectMetadata(BaseS3File.java:85)
  at org.apache.iceberg.aws.s3.BaseS3File.exists(BaseS3File.java:70)
  at org.apache.iceberg.aws.s3.S3InputFile.exists(S3InputFile.java:28)
  at org.apache.iceberg.spark.SparkCatalog.purgeTable(SparkCatalog.java:373)

That's the table.io().newInputFile(metadataFileLocation).exists() within:

org.apache.iceberg.spark.SparkCatalog#purgeTable

  @Override
  public boolean purgeTable(Identifier ident) {
    try {
      org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
      ValidationException.check(
          PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
          "Cannot purge table: GC is disabled (deleting files may corrupt other tables)");
      String metadataFileLocation =
          ((HasTableOperations) table).operations().current().metadataFileLocation();

      boolean dropped = dropTableWithoutPurging(ident);

      if (dropped) {
        // check whether the metadata file exists because HadoopCatalog/HadoopTables
        // will drop the warehouse directly and ignore the `purge` argument
        boolean metadataFileExists = table.io().newInputFile(metadataFileLocation).exists();

        if (metadataFileExists) {
          SparkActions.get().deleteReachableFiles(metadataFileLocation).io(table.io()).execute();
        }
      }

      return dropped;
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      return false;
    }
  }
twuebi commented 1 month ago

Somewhere between org.apache.iceberg.aws.s3.S3FileIO.deleteBatch(S3FileIO.java:279) and at org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient.sign(S3V4RestSignerClient.java:340) the uri of the request sent to the signer becomes http://minio/examples, building iceberg with some debug log statements shows this, not sure why the aws sdk isn't using full paths here:

24/08/13 15:20:51 INFO S3FileIO: Adding object initial-warehouse[/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro](http://localhost:8070/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro) to bucket examples
24[/08/13](http://localhost:8070/08/13) 15:20:51 INFO S3FileIO: Deleting remainder batch for bucket examples [initial-warehouse[/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro](http://localhost:8070/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro)]
24[/08/13](http://localhost:8070/08/13) 15:20:51 INFO S3FileIO: Deleting batch for bucket examples [initial-warehouse[/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro](http://localhost:8070/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro)] DeleteObjectsRequest(Bucket=examples, Delete=Delete(Objects=[ObjectIdentifier(Key=initial-warehouse[/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro](http://localhost:8070/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro))]))
24[/08/13](http://localhost:8070/08/13) 15:20:51 INFO S3V4RestSignerClient: Signing request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=http, host=minio, port=9000, encodedPath=[/examples](http://localhost:8070/examples), headers=[amz-sdk-invocation-id, amz-sdk-request, Content-Length, Content-MD5, Content-Type, User-Agent], queryParameters=[delete])
24[/08/13](http://localhost:8070/08/13) 15:20:51 INFO S3V4RestSignerClient: Signing remoteSigningRequest: S3SignRequest{region=local-01, method=POST, uri=http://minio:9000/examples?delete, headers={amz-sdk-invocation-id=[e68f1421-88be-6e08-36a8-939ec20be33d], amz-sdk-request=[attempt=1; max=4], Content-Length=[300], Content-MD5=[aIWGzjPMvVWxkQ2Aji/wLA==], Content-Type=[application[/xml](http://localhost:8070/xml)], User-Agent=[s3fileio[/](http://localhost:8070/){iceberg-version=Apache Iceberg 1.6.0-SNAPSHOT (commit 4f796e65afd0982449fbaf83d87a0b2a6f6ffa81)}, aws-sdk-java[/2.25.60](http://localhost:8070/2.25.60) Mac_OS_X[/14.5](http://localhost:8070/14.5) OpenJDK_64-Bit_Server_VM[/19.0.2](http://localhost:8070/19.0.2)+7-FR Java[/19.0.2](http://localhost:8070/19.0.2) scala[/2.12.18](http://localhost:8070/2.12.18) vendor[/Amazon.com_Inc.](http://localhost:8070/Amazon.com_Inc.) io[/sync](http://localhost:8070/sync) http[/Apache](http://localhost:8070/Apache) cfg[/retry-mode/legacy](http://localhost:8070/retry-mode/legacy) cfg[/auth-source](http://localhost:8070/auth-source)#anon]}, properties={}, body=<?xml version="1.0" encoding="UTF-8"?><Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Object><Key>initial-warehouse/dafe0356-5973-11ef-bbe4-5fb98c80e306/01914be5-f665-7b51-8ff2-c00817e6abbe/metadata/snap-3127746933095469562-1-44313a2e-6f70-45d6-bfc8-0d4c22c93612.avro<[/Key](http://localhost:8070/Key)><[/Object](http://localhost:8070/Object)><[/Delete](http://localhost:8070/Delete)>}
    at org.apache.iceberg.aws.s3.signer.S3V4RestSignerClient.sign(S3V4RestSignerClient.java:340)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.lambda$signRequest$4(SigningStage.java:154)
    at software.amazon.awssdk.core.internal.util.MetricUtils.measureDuration(MetricUtils.java:60)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.signRequest(SigningStage.java:153)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.execute(SigningStage.java:72)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.SigningStage.execute(SigningStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
    at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:224)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:80)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
    at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
    at software.amazon.awssdk.services.s3.DefaultS3Client.deleteObjects(DefaultS3Client.java:3154)
    at org.apache.iceberg.aws.s3.S3FileIO.deleteBatch(S3FileIO.java:279)
twuebi commented 2 weeks ago

To support client-side deletes, we'll have to use the keys within the delete body to map the sign request to a tabular which we need to figure out if the caller is authorized. For now, we could adopt a soft-delete approach which marks tabulars as deleted on drop which could be cleaned up via some other action #299.