delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.17k stars 1.62k forks source link

Allow concurrent writes to partitions that don't interact with each other #9

Closed calvinlfer closed 4 years ago

calvinlfer commented 5 years ago

I have a use case where I would like to update multiple partitions of data at the same time but the partitions are totally separate and do not interact with each other.

For example, I would like to run these queries concurrently (which would be very useful in the case of backfills):

spark.read.parquet("s3://jim/dataset/v1/valid-records")
  .filter(col("event_date") === lit("2019-01-01"))
  .write
  .partitionBy("event_date")
  .format("delta")
  .mode(SaveMode.Overwrite)
  .option("replaceWhere", "event_date == '2019-01-01'")
  .save("s3://calvin/delta-experiments")
spark.read.parquet("s3://jim/dataset/v1/valid-records")
  .filter(col("event_date") === lit("2019-01-02"))
  .write
  .partitionBy("event_date")
  .format("delta")
  .mode(SaveMode.Overwrite)
  .option("replaceWhere", "event_date == '2019-01-02'")
  .save("s3://calvin/delta-experiments")

So the data above being written as delta belongs to two separate partitions which do not interact with each other. According to the Delta documentation and what I experience is a com.databricks.sql.transaction.tahoe.ProtocolChangedException: The protocol version of the Delta table has been changed by a concurrent update. Please try the operation again.

Would you support this use-case where you can update partitions concurrently that do not interact with each other?

Parquet seems to allow this just fine (without any corruption if you turn on dynamic partitioning with spark.sql.sources.partitionOverwriteMode). This is a very valid use case if you adhere to Maxime Beauchemin's technique of immutable table partitions.

wernerdaehn commented 5 years ago

I would assume this limitation is because Delta supports ACID within a table. If you have two sessions writing into different partitions, this would need a different transaction handling compared to the situation where a single writer writes into all partitions. Might be harder to implement than it looks at first sight.

Having said that, I would love to have such option as well. There will be situations where you need mass data loads and would be okay with a relaxed transaction guarantee. And there will be situations where transaction guarantees are more important than mass data performance.

my2cents

hackmad commented 5 years ago

I can appreciate the challenges in designing something like this. However, it basically makes it so that existing processes that can use Parquet format to simultaneously load partitions cannot be converted over to using Delta. It essentially serializes all stages that could be run in parallel. It might be worth having an option to load all data and then update partition information in the metastore. Similar to how you would have to do in Athena. If a new pipeline is created, we would have to workaround this by first loading the secondary parition level data into their own S3 locations without partitioning and then later organize them. This would still have the additional overhead of addtional storage (which could be mitigated with retention policies in S3) but more importantly more than doubling the compute cost to process the data a second time.

tdas commented 5 years ago

First of all, thank you very much for trying out Delta Lake! The current version (0.1.0) has a very restrictive conflict detection check to be absolutely safe. In future releases, we will slowly relax the conflict criteria to allow more concurrency while ensuring the ACID guarantees. Hopefully, we will be able to make such workloads easier.

ligao101 commented 5 years ago

hello I am looking into how delta would work on s3 based data lake. what are the current limitations of running the current delta lake on s3? This issue appears to be one of the potential limitations we could see. Thanks!

tdas commented 5 years ago

Delta Lake currently only works with HDFS with full guarantees because HDFS provides the necessary file system operation guarantees that give Delta Lake its consistency guarantee. S3 file system does not provide those guarantees yet, primarily because S3 does not provide list-after-write guarantees. Details on the required guarantees - https://github.com/delta-io/delta#transaction-protocol

PS: This is a completely different issue that the original issue in this thread. Please make a different issue for this.

liwensun commented 4 years ago

Thanks for sharing your use case and the great discussions. I have created a concurrency support tracking issue which references to this issue so people can see the use cases and discussions here.

koertkuipers commented 4 years ago

@calvinlfer i agree that concurrent writes to totally separate partitions would be great.

however i was surprised to hear you say parquet supports this just fine. we have run into issues with this using dynamic partitionOverwriteMode and partitionBy, because both writers try to create temporary files in baseDir/_temporary, leading to weird errors when one finishes and deletes _temporary while the other job is still running. just FYI.

koertkuipers commented 4 years ago

i am not sure it is straightforward to safely allow concurrent writes that replace partitions. optimistic transaction seems to know what files were added or deleted, but thats not the same as knowing what the intent was of the transaction.

for example a transaction might have had the intent to replace everything where say a=1, so replaceWhere("a=1"), and let say there was nothing to delete and it only wrote out only to a=1/b=1/part.snappy if another transaction ran at same and also had a replaceWhere("a=1") and also deleted nothing but created a file a=1/b=2/part.snappy, then by just looking at the file actions they do not seem to be in conflict, but they are.

koertkuipers commented 4 years ago

note that for the example of dynamic partition overwrite (which is not in delta but we added on our own branch) it is easy to reason about, because the files deleted are always in exact same partitions as where files are added, so you only need to check for conflicts with respect to added files (e.g. verify the transactions did not write to exact same partitions).

calvinlfer commented 4 years ago

Hey @koertkuipers what version and flavor of Spark are you using? @hackmad and myself have seen this work at scale on the Databricks platform with Spark 2.4.1

koertkuipers commented 4 years ago

@calvinlfer we use spark 2.4.1 on hadoop 2.7 however i am a little uncertain if that's the version we observed the issue with or if it was an earlier version and we have avoided the situation ever since. i remember the errors being hdfs lease exceptions because one job would delete the _temporary directory while the other was still using it.

koertkuipers commented 4 years ago

@calvinlfer maybe things changed for the better. i now see when i run two jobs writing to different partitions using partitionBy and dynamic partitionOverwriteMode:

drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-be40030e-8eef-4680-85ac-b55e6519df60/partition=2
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:18 out/.spark-staging-d25f16d3-8f2d-4cf4-89bd-09256469b5e5/partition=1
Found 1 items
drwxr-xr-x   - koert koert          0 2019-08-29 18:17 out/_temporary/0

so it seems each job has its own .spark-staging directory, and _temporary isnt really used? not sure...

calvinlfer commented 4 years ago

Sorry I should have mentioned this more explicitly earlier but we used S3 instead of HDFS so I believe the underlying implementation is quite different and allows for concurrent writes to non conflicting partitions

hospadar commented 4 years ago

Wanted to add to this - this would be a blocker for us as well to switch from parquet to delta.

Right now we store our underlying data in s3 as parquet and do management of partitions fairly manually to keep tables in a happy state. We always write out new (or replacement) partitions to a new folder then just swap the location of the partition in the metastore to make it look like an atomic update to anyone querying the data downstream (also allows us to theoretically roll back an update, although that's impractical for us and requires log spelunking to find the old paths).

We often do big backfill/reprocessing jobs where we process tons of dates in parallel to keep the cluster over-committed. If we could only write one partition at once our throughput would slow down quite a bit on jobs like this.

I'd love to switch to delta, it would make it MUCH easier to revert data to earlier states (and a variety of other things would become more convenient for us), but this issue is probably a blocker.

Our logic goes something like:

///// First thread is doing something like:
String path = "s3://warehouse/" + UUID.randomUUID().toString()
dataframe.write.parquet(path)
spark.sql("ALTER TABLE target PARTITION (dt='2019-01-01') SET LOCATION '" + path + "'")

/////Another thread doing the same thing, but for a different date
String path2 = "s3://warehouse/" + UUID.randomUUID().toString()
dataframe.write.parquet(path2)
//register second datframe to a different partition
spark.sql("ALTER TABLE target PARTITION (dt='2019-01-02') SET LOCATION '" + path2 + "'")
koertkuipers commented 4 years ago

@calvinlfer i did some more checking and the issue of writers conflicting with each other when writing to same baseDir with dynamic partition overwrite does still exist in spark 2.4 and spark master for all file sources. i cannot say anything about writing to s3, that could be very different. for more info please see (and vote for): https://issues.apache.org/jira/browse/SPARK-28945

tdas commented 4 years ago

We have improved our concurrency control in this commit - f32830022e6a664f99f25f3dad8584f5cd9952bf

This allows operations on disjoint partitions to be concurrently written.