Open kkondamadugula opened 1 year ago
Such concurrent exceptions when there is an actual logical conflict between two concurrent operations such that if both writes are merged, they will not be serializable. For example, if you are doing an append that adds x = 1, at the same time as a merge that deletes all rows with x = 1. If the append finishes first, while the merge running, and then the merge completes without deleting the new x=1 row (because it was not added when merge had started), you are left with a table history that shows append --> merge, but the table data has x=1 which could not have happened if append and merge were done serially (that is, serializability is broken).
This is independent of dynamodb stuff. It only enabled you to perform transactionally guaranteed writes without which you can just loose data. So just setting this up does not avoid such logical conflicts.
There are different ways to avoid conflicts. For example, partitioning the table and making sure that concurrent operations do not read or write from the same partition. All depends on what other concurrent operations you are running with merge.
My use-case is to process a dataset worth 100s of partitions in concurrency. The data is partitioned, and they are disjointed. I was facing ConcurrentAppendException due to S3 not supporting the “put-if-absent” consistency guarantee. From Delta Lake 1.2, with the help of S3DynamoDBLogStore API, all writers across multiple clusters and/or Spark drivers can concurrently write to Delta Lake S3 while ensuring that only one writer succeeds with each transaction. My Delta Lake version is 2.1. I created a Dynamo DB table with auto-scaling enabled for number of reads/writes and passed the configuration to the delta job. Please find the configuration below (omitted some spark related config).
Please find the actual logic below:
delta_table.alias("old").merge(
delta_table is the destination table in delta lake.
input_df is a combined data frame of all the inserts, and updates.
deletes_df is the dataframe that has just the deletes.
I am still running into delta.exceptions.ConcurrentAppendException irrespective of these settings. Am I doing something wrong?