Closed zmeir closed 2 years ago
Retrying any operation on conflict can very expensive as it will require reprocessing the entire data once again as the underlying table snapshot has changed. So fundamentally, instead of retrying and suffering from the resultant unpredictability, it's much better to design your workloads to avoid conflicts altogether.
@tdas What is the recommended way to "design the workload" to avoid conflicts altogether? How is it possible with multiple streaming queries updating the same table to completely not have this problem? I think this improvement makes sense.
@ZoharMeir What has been your mechanism for effectively retrying the query? We are specifically targeting partitions during upserts and the latency seems to be unbelievably slow, so not sure if we should pivot back to the vanilla merge queries that cause the append conflict exceptions.
@AlexDHoffer Our current solution is not very efficient - we basically wrap the query execution in a try/except and retry the query if the exception is the expected Concurrent*Exception
. Something like this:
def run_query_with_retry(query, attempts=3):
try:
return query()
except Exception as e:
if attempts > 1 and ('ConcurrentAppendException' in str(e) or 'ConcurrentDeleteReadException' in str(e)):
return run_query_with_retry(query, attempts-1)
else:
raise
run_query_with_retry(
lambda: target_dt.alias('t').merge(source_df.alias('s'), 's.key=t.key').whenNotMatchedInsertAll().execute()
)
Regarding the append conflicts with merge queries - perhaps this would be helpful: https://github.com/delta-io/delta/issues/326
I've been lead to believe by Databricks support that there is more of a built-in way to trigger these retries, specifically with OPTIMIZE. We frequently run into issues with Delta tables that are Streaming Sinks and a separate job that OPTIMIZES said tables. The OPTIMIZE job fails with ConcurrentDeleteReadException
. The confusing part is that the only jobs writing to the table are the Stream and the OPTIMIZE. The Stream is only appending and it does not have auto-optimize or auto-compaction turned on, so what else would be deleting data?
Anyway... I got this from support
Enable retry handling logic in optimize job: You can enable retry logic on the explicit optimize command. Concurrent exceptions are always re-triable and there is no harm. So if you retry the optimize operation 3 times, it will succeed and the job will not fail.
I believe the failure you are describing stems from a conflict between the stream and optimize jobs. I submitted https://github.com/delta-io/delta/pull/626 some months ago which should resolve this conflict, but sadly it was never merged :(
We’re currently reviewing the issue and will get back to you shortly.
Thank you @ZoharMeir, we can use your example to solve a similar problem in our customer project. Regards, Sergio
When running multiple concurrent merge/update operations on the same output Delta table, some of the operations might fail due to a concurrent write conflict. This is a result of Delta Lake using optimistic concurrency control. To prevent the entire application from failing, you can catch the exception thrown by the conflict, inspect it, and then retry the operation (if it makes sense to do so). I believe this is not uncommon in streaming scenarios where a Delta table sink is being updated by multiple streaming sources.
My suggestion is to provide the retry mechanism in Delta Lake, which could be controlled by a configuration value
spark.databricks.delta.retryWriteConflict.enabled = true # would be false by default spark.databricks.delta.retryWriteConflict.limit = 3 # optionally limit the maximum amout of retries # can probably add more options like retry delay, exponential backoff, etc...
hi, i've tried setting these as config of spark session and then tried updating/inserting an entry while simultaneously compacting a particular partition (which isn't the partition that I was updating/inserting). It still leads to "ConcurrentAppendException", even though i've set the limits way above than 3. Is there any alternate solution/ is it something wrong that i am doing here?
Hi @sreeaishwar, these configurations were only a suggestion for a new feature - they do not currently exist in any Delta release.
You can try this approach as a workaround if it makes sense for you: https://github.com/delta-io/delta/issues/445#issuecomment-775085613
(keep in mind this code snipped was written for Delta 0.6.x, so you may need to adjust it to work with the latest version)
Hi @sreeaishwar, these configurations were only a suggestion for a new feature - they do not currently exist in any Delta release.
You can try this approach as a workaround if it makes sense for you: #445 (comment) (keep in mind this code snipped was written for Delta 0.6.x, so you may need to adjust it to work with the latest version)
hi, @zmeir tried the code snippet u mentioned here,it still thows a "ConcurrentAppendException". i hope this makes sense to u. also i tried increasing the limit which also didn't help. delta is same.
Hi @sreeaishwar, these configurations were only a suggestion for a new feature - they do not currently exist in any Delta release. You can try this approach as a workaround if it makes sense for you: #445 (comment) (keep in mind this code snipped was written for Delta 0.6.x, so you may need to adjust it to work with the latest version)
hi, @zmeir tried the code snippet u mentioned here,it still thows a "ConcurrentAppendException". i hope this makes sense to u. also i tried increasing the limit which also didn't help. delta is same.
the issue is solved.
So what was the fix? Use retry as mentioned here? https://github.com/delta-io/delta/issues/445#issuecomment-775085613
@aish-war Can you please share the latest code that you used that fixed the logic?
@kumarigal Have you found the solution for the problem?
When running multiple concurrent merge/update operations on the same output Delta table, some of the operations might fail due to a concurrent write conflict. This is a result of Delta Lake using optimistic concurrency control. To prevent the entire application from failing, you can catch the exception thrown by the conflict, inspect it, and then retry the operation (if it makes sense to do so). I believe this is not uncommon in streaming scenarios where a Delta table sink is being updated by multiple streaming sources.
My suggestion is to provide the retry mechanism in Delta Lake, which could be controlled by a configuration value