delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.65k stars 1.72k forks source link

[BUG] Concurrent write to _delta_log on AWS S3 Delta Table results in data loss #1830

Open ArtOfStew opened 1 year ago

ArtOfStew commented 1 year ago

Bug

Describe the problem

We have the following stack: Delta 2.2.0 running in AWS Glue 4.0 jobs with PySpark.

TL;DR: Multiple parallel glue jobs writing to the same table can concurrently write to the same _delta_log transaction log file. The later job "wins"; it performs an overwrite on the S3 object, rather than creating a new one.

Longer version:

We are using the above Glue jobs to load data from flat files (on S3) into Delta Tables. We often have multiple files to load to the same table as part of the same batch; in this case we spawn several Glue Jobs at once, aiming to run them in parallel. The files may be written to different partitions or not; this issue still manifests.

We've found that data intermittently goes missing when we process multiple files. We have traced this to a concurrent write to the transaction log.

We have read up on articles on Delta Table concurrency; most of them are around read concurrency. This bug appears to be linked to S3 interactions in particular.

The result of this is that data partitions exist, but the transaction log has no record of them. This, in turn, means that generating the symlink manifest does not have these partitions, which means our downstream process either don't have the data (athena) or break (redshift spectrum). Basically, it results is the table breaking ACIDity.

We have a workaround we have implemented; I will describe that below. However, it would be great if this could be resolved in the framework (if possible). If there is a better way of parallelizing loads or a setting to ensure we don't overwrite log files, please let us know.

Steps to reproduce

I cannot supply the full stacks of code we use, but I will describe our setup in more detail. I also have a number of screen shots demonstrating our analysis of the situation.

  1. Set up a glue job with PySpark. The script should load a file from S3 to a delta table. We do not read from the existing delta table before insertion. The Spark job simply transforms the file columns into a dataframe and writes to the table; we set up the tables beforehand via configuration.

Here is the spark session we set up:

def get_spark_session():
    builder = SparkSession.builder.appName("MyApp") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
        .config("spark.delta.logStore.class","io.delta.storage.S3SingleDriverLogStore")

    spark = configure_spark_with_delta_pip(builder).getOrCreate()

    return spark

Here is the eventual write statement. The partition columns are as shown, but the table has arbitrary other columns. The replace condition is actually generated dynamically, of course.

replace_cond = "edl_extract_dt='2023-01-01 01:01:01' and edl_file_name='test_col_basic_overwrite_source_20230101010101.csv'"
partition_columns = ['edl_extract_dt','edl_file_name']
data_df.write.format('delta') \
      .mode('overwrite') \
      .partitionBy(partition_columns) \
      .option("replaceWhere", replace_cond) \
      .save(destination_s3_uri)
  1. Have a large number of files to write in the S3 source location. I've demonstrated this consistently with 20 files.
  2. Run the glue job once for each file, simultaneously. We use Step Functions to orchestrate this.
  3. Generate the symlink manifest for the Delta table after all of the above runs are complete (this just makes things easier, you could also just look at the transaction log.
  4. Examine the partitions written to the Delta location vs the partitions according to the _symlink_format_manifest folder. You should find a discrepancy.

Observed results

Here are the contents of the delta table location after the above steps. There are 20 partitions, from the 20 files. Table contents on s3

Here are the _symlink_format_manifest contents: several partitions are missing. symlink format manifest

Here is a view of the version history of the contents of the _delta_log folder. You can see some files have multiple overwrites, such as version ...007.json delta log versions

Earliest to latest for version ...007.json:

Expected results

Expectation: transaction log should be consistent with all data writes that occur.

Further details: Workaround

We have implemented a kind of semaphore: Before we write to a delta table, we use a DynamoDB Table to ensure only one write happens at a time. This slows things down a bit, of course, but it's not too significant in our case.

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

nkarpov commented 1 year ago

Hi @ArtOfStew - sorry you ran into this and thanks for the detailed report.

This isn't a bug in Delta, rather the lack of consistency guarantees provided by S3. We have a solution designed specifically to address your parallel writer case documented here: https://docs.delta.io/latest/delta-storage.html#multi-cluster-setup.

I don't see it mention of it in your post, did you give it a try? Please let us know if you had any trouble.

ArtOfStew commented 1 year ago

@nkarpov Makes sense, thanks for the link... We didn't see it before implementing our workaround. Pity, but the workaround looks like it's on version 2.4.0 onwards, and we're restricted to 2.2.0 at least for now, so I'm not too broken up about doing our own workaround. And, I guess we were on the right track, since we're using Dynamo to do essentially the same thing.

I'll close off the "bug"!

tdas commented 1 year ago

@ArtOfStew The DynamoDB solution has exists since Delta 1.2 - https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/

So you should be able to very easily use it in Delta 2.2.

ArtOfStew commented 1 year ago

Okay great - thanks for the additional resource, @tdas. We'll give that configuration a shot on our end. 👍

watfordkcf commented 1 year ago

@tdas we had to use a custom authentication provider in order to support it on our setup: https://github.com/watfordkcf/aws-custom-credential-provider/tree/com.kcftech.sd/delta-spark-utils

tdas commented 1 year ago

Whoa! Can you explain why did you have to build a custom authentication provider? Why didnt the usual default credential provider chain work?

cc @scottsand-db

watfordkcf commented 1 year ago

We needed to assume a specific role on EMR Serverless. The default AWS credential providers were inadequate for this (not for a lack of trying). It seems Zillow had the same problem many moons ago and it was never really resolved.

Update: I should have been more specific, a Cross Account dynamodb table. https://repost.aws/questions/QURYC5wUuVTIGLsJ-Z2-Ks7g/amazon-dynamodb-across-account-access-with-vpc-endpoints