delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.38k stars 1.66k forks source link

[BUG] S3DynamoDBLogStore do not support hadoop-aws s3a SimpleAWSCredentialsProvider. #1235

Closed GrigorievNick closed 2 years ago

GrigorievNick commented 2 years ago

Bug

Describe the problem

When configure spark using setup-configuration-s3-multi-cluster In example s3a credentials have configure with hadoop.fs.s3a.* properties. \ But there is another option in documentation spark.io.delta.storage.S3DynamoDBLogStore.credentials.provider. where used DefaultAWSCredentialsProviderChainby default. \ And because of code implementation there is no way to specifyorg.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider` which take credentials from hadoop conf. \ Because delta create S3DynamoDBLogStore create AWS credentials provider using reflection with empty constructor.

final AWSCredentialsProvider auth =
                (AWSCredentialsProvider) Class.forName(credentialsProviderName)
                    .getConstructor()
                    .newInstance();
            final AmazonDynamoDBClient client = new AmazonDynamoDBClient(auth);
            client.setRegion(Region.getRegion(Regions.fromName(regionName)));

Steps to reproduce

Write any spark application which write new data to delta table. \ And configure it to use S3DynamoDBLogStore using delta.io official documentation.

bin/spark-shell \
 --packages io.delta:delta-core_2.12:1.2.1,org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-storage-s3-dynamodb:1.2.1 \
 --conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
 --conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key> \
 --conf spark.delta.logStore.s3a.impl=io.delta.storage.S3DynamoDBLogStore \
 --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=eu-west-1

VERY IMPORTANT NOTES - Spark Cluster must not have valid AWS credentials on nodes.

Observed results

Application will fail with access denied.

Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: User: arn:aws:iam::my_account_id:table:user/my_iam_name is not authorized to perform: dynamodb:DescribeTable on resource: arn:aws:dynamodb:eu-west-1:my_account_id:table/datalake_delta_log because no identity-based policy allows the dynamodb:DescribeTable action (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: AccessDeniedException; Request ID: 44VVFKCANGN7E9TTV8EJLUMN97VV4KQNSO5AEMVJF66Q9ASUAAJG)

Expected results

Delta commit happen correctly and dynamoDB table with locks updated.

Further details

Full stacktrace

User class threw exception: java.util.concurrent.ExecutionException: java.lang.reflect.InvocationTargetException
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:289)
at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:111)
at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:132)
at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2381)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2351)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
at com.google.common.cache.LocalCache.get(LocalCache.java:3965)
at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4764)
at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:574)
at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:581)
at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:537)
at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:479)
at org.apache.spark.sql.delta.sources.DeltaSink.<init>(DeltaSink.scala:46)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createSink(DeltaDataSource.scala:134)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:317)
at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:432)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:396)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:230)
at com.appsflyer.raw.data.ingestion.KafkaBackupDeltaLakeStreamer$.processStream(KafkaBackupDeltaLakeStreamer.scala:79)
at com.appsflyer.raw.data.ingestion.KafkaBackupDeltaLakeStreamer$.main(KafkaBackupDeltaLakeStreamer.scala:37)
at com.appsflyer.raw.data.ingestion.KafkaBackupDeltaLakeStreamer.main(KafkaBackupDeltaLakeStreamer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:737)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.delta.storage.LogStore$.createLogStoreWithClassName(LogStore.scala:266)
at org.apache.spark.sql.delta.storage.DelegatingLogStore.createLogStore(DelegatingLogStore.scala:48)
at org.apache.spark.sql.delta.storage.DelegatingLogStore.$anonfun$schemeBasedLogStore$2(DelegatingLogStore.scala:66)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.delta.storage.DelegatingLogStore.schemeBasedLogStore(DelegatingLogStore.scala:66)
at org.apache.spark.sql.delta.storage.DelegatingLogStore.getDelegate(DelegatingLogStore.scala:83)
at org.apache.spark.sql.delta.storage.DelegatingLogStore.listFrom(DelegatingLogStore.scala:125)
at org.apache.spark.sql.delta.Checkpoints.findLastCompleteCheckpoint(Checkpoints.scala:251)
at org.apache.spark.sql.delta.Checkpoints.findLastCompleteCheckpoint$(Checkpoints.scala:242)
at org.apache.spark.sql.delta.DeltaLog.findLastCompleteCheckpoint(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.$anonfun$loadMetadataFromFile$2(Checkpoints.scala:221)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.$anonfun$loadMetadataFromFile$1(Checkpoints.scala:203)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
at org.apache.spark.sql.delta.DeltaLog.withDmqTag(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.loadMetadataFromFile(Checkpoints.scala:202)
at org.apache.spark.sql.delta.Checkpoints.$anonfun$loadMetadataFromFile$2(Checkpoints.scala:215)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.$anonfun$loadMetadataFromFile$1(Checkpoints.scala:203)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
at org.apache.spark.sql.delta.DeltaLog.withDmqTag(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.loadMetadataFromFile(Checkpoints.scala:202)
at org.apache.spark.sql.delta.Checkpoints.$anonfun$loadMetadataFromFile$2(Checkpoints.scala:215)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.$anonfun$loadMetadataFromFile$1(Checkpoints.scala:203)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
at org.apache.spark.sql.delta.DeltaLog.withDmqTag(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.loadMetadataFromFile(Checkpoints.scala:202)
at org.apache.spark.sql.delta.Checkpoints.$anonfun$loadMetadataFromFile$2(Checkpoints.scala:215)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.$anonfun$loadMetadataFromFile$1(Checkpoints.scala:203)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag(DeltaLogging.scala:124)
at org.apache.spark.sql.delta.metering.DeltaLogging.withDmqTag$(DeltaLogging.scala:123)
at org.apache.spark.sql.delta.DeltaLog.withDmqTag(DeltaLog.scala:64)
at org.apache.spark.sql.delta.Checkpoints.loadMetadataFromFile(Checkpoints.scala:202)
at org.apache.spark.sql.delta.Checkpoints.lastCheckpoint(Checkpoints.scala:197)
at org.apache.spark.sql.delta.Checkpoints.lastCheckpoint$(Checkpoints.scala:196)
at org.apache.spark.sql.delta.DeltaLog.lastCheckpoint(DeltaLog.scala:64)
at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:53)
at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:69)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:564)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$2(DeltaLog.scala:564)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:433)
at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperation$5(DeltaLogging.scala:114)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:433)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:113)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:98)
at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:433)
at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:563)
at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:574)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4767)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
... 22 more
Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: User: arn:aws:iam::my_account_ID:table:user/my_IAM_name is not authorized to perform: dynamodb:DescribeTable on resource: arn:aws:dynamodb:eu-west-1:my_account_ID:table/datalake_delta_log because no identity-based policy allows the dynamodb:DescribeTable action (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: AccessDeniedException; Request ID: 44VVFKCANGN7E9TTV8EJLUMN97VV4KQNSO5AEMVJF66Q9ASUAAJG)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1638)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1303)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:2186)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:2162)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1048)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1024)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1060)
at io.delta.storage.S3DynamoDBLogStore.tryEnsureTableExists(S3DynamoDBLogStore.java:227)
at io.delta.storage.S3DynamoDBLogStore.<init>(S3DynamoDBLogStore.java:120)
... 95 more

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

GrigorievNick commented 2 years ago

As fast and dirty workaround, user can specify credentials through spark env variable pass through mechanic on YARN. \ Or specify env variables on container level in MESOS and K8s clusters. \ Spark conf for pass though on YARN.