delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.59k stars 1.71k forks source link

Race when saving .checkpoint.parquet sometimes saves file with the wrong S3 permissions when assuming role. #231

Closed ghost closed 3 years ago

ghost commented 5 years ago

Hello,

My organization runs an EMR cluster in one Amazon account, but we save data to an S3 bucket (through EMRFS) in another account. In order to accomplish this, we configure the SparkSession as such...

val FS_S3A_ACCESS_KEY = "spark.hadoop.fs.s3a.access.key"
val FS_S3A_SECRET_KEY = "spark.hadoop.fs.s3a.secret.key"
val FS_S3A_SESSION_TOKEN = "spark.hadoop.fs.s3a.session.token"

builder.config(FS_S3A_ACCESS_KEY, s3Credentials.getAccessKeyId)
.config(FS_S3A_SECRET_KEY, s3Credentials.getSecretAccessKey)
.config(FS_S3A_SESSION_TOKEN, s3Credentials.getSessionToken)

This configuration works fine; however, very occasionally a race occurs when saving the .checkpoint.parquet file in which the file is written with the EMR account's privileges instead of the role-assumed privileges indicated above. I should note that the AWS environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, etc.) ARE set for the EMR account's privileges-- this is why we override the s3 keys at runtime, as by default spark will retrieve s3 keys from the environment.

After investigating the checkpoint logic, I found this parquet file is not written out using the spark configuration. Instead, the Checkpoints class calls Job.getInstance(), and retrieves a configuration from that. This will be a new configuration which does not contain any of the configurations defined in the SparkSession. In the logs, we can see odd things happening, such as the ParquetOutputCommitter being used for writing the .checkpoint.parquet file instead of the EmrOptimizedSparkSqlParquetOutputCommitter as is defined in our configuration.

What I cannot explain is why this error is only occasionally happening. Most of the time the .checkpoint.parquet file is written with the permissions as configured.

I hope this helps.

Thanks

tdas commented 5 years ago

This is interesting. Not using the SparkSession configuration probably is a bug that should be fixed. But I dont know why it fails only intermittently. There may be funny race conditions going on during the creation of the new configuration object, it randomly may or may not pick up the correct credential stuff from somewhere.... hadoop confs are always a mystery to debug.

The fix should be easy. Instead of using the new job's configuration, you can create the conf object using spark.sessionState.newHadoopConf (see other places where this is used. It would be awesome if you can actually try out this idea on your setup since we know that it reproduces the issue reliably.

ghost commented 5 years ago

Thank you for the quick response. I will try and get time approved for next sprint to fork delta lake with your suggested change and report back.

pranavanand commented 4 years ago

Hi @trega123 , Were you able to find time to make the suggested change?

steveloughran commented 4 years ago

You could always change the fs.s3a.credential.provider list to remove the one which picks up the IAM session credential from the EC2 per-VM http server.

travisclagrone commented 3 years ago

This issue manifests in Delta Lake for Azure Data Lake Storage as well.

dennyglee commented 3 years ago

Closing this due to inactivity. Please reopen if this issue is still relevant and/or reproducible on the latest version of Delta for any of the storage formats. Thanks!