apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.44k stars 2.23k forks source link

Accessing AWS Glue in different AWS Account #6215

Closed grbinho closed 1 year ago

grbinho commented 1 year ago

Apache Iceberg version

0.14.0

Query engine

Other

Please describe the bug 🐞

Hi

We are using AWS Glue Jobs 3.0 (Spark 3.1) with Iceberg 14.0 (through Glue Marketplace connector).

We have a hub and spoke setup for Glue catalog, were we have a central AWS account hosting the catalog and the data and multiple data processing accounts running Glue jobs.
We are doing this by delegating access from central account to the root principal of the processing accounts and then using IAM role for Glue jobs that has cross account access.
This works fine if we write non iceberg tables.
When we try to write iceberg tables, we get an error:

software.amazon.awssdk.services.glue.model.EntityNotFoundException: Database my_database not found.

This normally indicates permission issues, but since it works without iceberg, that should not be the case.

From the stack, we noticed that failure comes from org.apache.iceberg.aws.glue.GlueCatalog.defaultWarehouseLocation(GlueCatalog.java:226 (In master this is now here
https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java#L259)

To me it looks that this method is missing .catalogId(awsProperties.glueCatalogId()) on the GetDatabaseRequest.builder().

Our current workaround is to use assume role feature, but we would prefer not to need that.
I see that all other calls in the GlueCatalog code are using the glue.id catalog property.
Can this also be added to the defaultWarehouseLocation method? I'm happy to open a PR, but wanted to check first if there is a specific reason this setting is omitted.
To me it is expected that once glue.id is set, that should be the Glue catalog used for all Glue requests.

Thanks for the time and the great library!

For reference, our setup with the workaround.

conf = SparkConf()
conf.set("spark.hadoop.hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
conf.set("spark.hadoop.hive.metastore.glue.catalogid", f"{catalog_account_id}")
#https://aws.amazon.com/blogs/big-data/use-the-aws-glue-connector-to-read-and-write-apache-iceberg-tables-with-acid-transactions-and-perform-time-travel/
conf.set(f"spark.sql.catalog.{catalog}.glue.id", f"{catalog_account_id}")
conf.set(f"spark.sql.catalog.{catalog}.glue.account-id", f"{catalog_account_id}")
conf.set(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog")
conf.set(f"spark.sql.catalog.{catalog}.warehouse", f"s3://{warehouse_path}")
conf.set(f"spark.sql.catalog.{catalog}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set(f"spark.sql.catalog.{catalog}.client.factory", "org.apache.iceberg.aws.AssumeRoleAwsClientFactory")
conf.set(f"spark.sql.catalog.{catalog}.client.assume-role.arn", f"arn:aws:iam::{catalog_account_id}:role/my_role")
conf.set(f"spark.sql.catalog.{catalog}.client.assume-role.region", "eu-west-1")
conf.set(f"spark.sql.catalog.{catalog}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.defaultCatalog", f"{catalog}")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
spark_session = SparkSession.builder.appName("gdp").config(conf=conf).enableHiveSupport().getOrCreate()

Also the stack.

22/11/17 20:54:07 ERROR GlueExceptionAnalysisListener: [Glue Exception Analysis] {
    "Event": "GlueETLJobExceptionEvent",
    "Timestamp": 1668718447148,
    "Failure Reason": "Traceback (most recent call last):\n  File \"/tmp/test_ivan.py\", line 42, in <module>\n    df.writeTo(f\"{catalog}.{database}.{table_name}\").tableProperty(\"format-version\", \"2\").createOrReplace()\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py\", line 1556, in createOrReplace\n    self._jwriter.createOrReplace()\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n    answer, self.gateway_client, self.target_id, self.name)\n  File \"/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py\", line 111, in deco\n    return f(*a, **kw)\n  File \"/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 328, in get_return_value\n    format(target_id, \".\", name), value)\npy4j.protocol.Py4JJavaError: An error occurred while calling o158.createOrReplace.\n: software.amazon.awssdk.services.glue.model.EntityNotFoundException: Database my_database not found. (Service: Glue, Status Code: 400, Request ID: 0147b316-8f67-4947-a929-0787b9ccb908)\n\tat software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)\n\tat software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)\n\tat software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)\n\tat software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)\n\tat software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)\n\tat software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)\n\tat software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)\n\tat software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)\n\tat software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)\n\tat software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)\n\tat software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)\n\tat software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)\n\tat software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)\n\tat software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:167)\n\tat software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)\n\tat software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175)\n\tat software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)\n\tat software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)\n\tat software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)\n\tat software.amazon.awssdk.services.glue.DefaultGlueClient.getDatabase(DefaultGlueClient.java:5173)\n\tat org.apache.iceberg.aws.glue.GlueCatalog.defaultWarehouseLocation(GlueCatalog.java:226)\n\tat org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.newReplaceTableTransaction(BaseMetastoreCatalog.java:215)\n\tat org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.createOrReplaceTransaction(BaseMetastoreCatalog.java:200)\n\tat org.apache.iceberg.CachingCatalog$CachingTableBuilder.createOrReplaceTransaction(CachingCatalog.java:285)\n\tat org.apache.iceberg.spark.SparkCatalog.stageCreateOrReplace(SparkCatalog.java:206)\n\tat org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:197)\n\tat org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)\n\tat org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)\n\tat org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)\n\tat org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)\n\tat org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)\n\tat org.apache.spark.sql.DataFrameWriterV2.$anonfun$runCommand$1(DataFrameWriterV2.scala:196)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)\n\tat org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)\n\tat org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)\n\tat org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)\n\tat org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:213)\n\tat org.apache.spark.sql.DataFrameWriterV2.createOrReplace(DataFrameWriterV2.scala:133)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat java.lang.Thread.run(Thread.java:750)\n",
    "Stack Trace": [
        {
            "Declaring Class": "get_return_value",
            "Method Name": "format(target_id, \".\", name), value)",
            "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
            "Line Number": 328
        },
        {
            "Declaring Class": "deco",
            "Method Name": "return f(*a, **kw)",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
            "Line Number": 111
        },
        {
            "Declaring Class": "__call__",
            "Method Name": "answer, self.gateway_client, self.target_id, self.name)",
            "File Name": "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
            "Line Number": 1305
        },
        {
            "Declaring Class": "createOrReplace",
            "Method Name": "self._jwriter.createOrReplace()",
            "File Name": "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
            "Line Number": 1556
        },
        {
            "Declaring Class": "<module>",
            "Method Name": "df.writeTo(f\"{catalog}.{database}.{table_name}\").tableProperty(\"format-version\", \"2\").createOrReplace()",
            "File Name": "/tmp/test_ivan.py",
            "Line Number": 42
        }
    ],
    "Last Executed Line number": 42,
    "script": "test_ivan.py"
}
singhpk234 commented 1 year ago

Can this also be added to the defaultWarehouseLocation method? I'm happy to open a PR, but wanted to check first if there is a specific reason this setting is omitted.

I think we should add catalogId here as well, went through the pr which introduced it, can't find any objection to it. please feel free to open a PR to get more feedbacks

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.