Closed sinichkin closed 2 years ago
Which Delta Lake version are you using?
Could you @sinichkin please provide this feedback so we can better understand this issue?
@scottsand-db I see its written in the stackoverflow comments - scalaMajorVersion=2.12 scalaMinorVersion=11 SPARK_VERSION=3.1.1 DELTA_VERSION=1.0.0 KAFKA_VERSION=2.3.1
Thanks, @eran-levy - to help with the flow, I've responded to the original SO question and the results of the analysis can be pushed back here into GitHub. HTH!
Hi, @scottsand-db @dennyglee was the above issue solved? I too am facing almost the same issue. I'll attach 2 screenshots that'll help u understand the issue, as u see I am running two spark session parallely, trying to update/insert in the same delta table but on different partitions, partitioned on "site", and it still throws "ConcurrentAppendException". Hope u understood what I am trying to convey. Could u kindly help me here,? I have also attached the ss of the exception too.
delta==0.4.2 delta-spark==1.0.0 deltalake==0.5.6 pyspark==3.1.1
Hi @aish-war thanks for your question, we will take a look at your error msg.
this is a complex issue! the exception that you posted... which merge query hit that issue .. the merge query on site "UNIV" or site "BAPT"?
also.. it might be helpful for us to repro the issue our side for deeper debugging. if you can repro this issue locally... could have package this into a self-runnable code + instructions that we can execute?
@tdas @scottsand-db hi guys, took some time to create a demo delta table, as we could not share the original delta table due to security reasons. I've attached a dox, Guide.odt, which contains the info on the issue we are facing. i've also attached the zipped version of delta table, along with the 2 .ipynb files. Hope this is enough.
@tdas @scottsand-db Hi guys any updates on the query raised?/
Hey @AthulJustin. Still looking. Sorry we've been busy working on the release of Delta Lake 1.2. Should have an answer for you in the next few days.
Hi @AthulJustin. This will take some time to debug. So far I've reproduced on Delta Lake 1.0 and 1.1.
Thanks @scottsand-db and @tdas . This issue is kind of like the top priority issue for us . Its been haunting us for like a month.
@tdas @scottsand-db @aish-war @AthulJustin
Hello everyone! I'd like to know whether there has been any progress on this issue. We are also dealing with the same issue. I haven't been able to come up with any solutions.
TIA
@sree-inapp and @AthulJustin - sorry, things have been very busy with the recent release of Delta Lake 1.2.0 and the upcoming Delta Lake 1.2.1. TD and I have time scheduled this week to investigate this issue. Thanks for your patience.
(sorry accidentally closed)
Hi @scottsand-db @tdas were you able to look into this issue? we are still stuck on this. Awaiting your response.
Hi @scottsand-db @tdas were you able to look into this issue? we are still stuck on this. Awaiting your response.
same here.
Hi @scottsand-db @tdas Awaiting your response.
Hi @scottsand-db @tdas Is there any solution to this problem? I am also facing the same issue.
Hi @scottsand-db @tdas Is there any solution to this problem? I am also facing the same issue.
Hi @scottsand-db @tdas is there any solution to the problem? we are also facing this issue
Hi @aish-war, so in your example you provided, changing how you create the source table, e.g.
df = dup_table.toDF().filter("acc_num=='6723952'")
to
df = dup_table.toDF().filter("acc_num=='6723952' AND site='BAPT'")
solves the problem.
Can you show me how you are create the source tables in production?
@scottsand-db will definitely check this out. thanks for responding.
This works too.
df_orig = dup_table.toDF().filter("acc_num=='3198639'")
rows_orig = df_orig.collect()
df = spark.createDataFrame(rows_orig)
# use df as the source DataFrame
@AthulJustin @MintuJohnsontj @fathimajafir can you please all give me more details on
@aish-war To be clear, the issue here was that your source DataFrame was built off of / directly referenced your target table.
source = target.toDF().filter("acc_num=='3198639'")
In this case, you should save your source to a temporary table first, and then read from that table during your merge.
This was an issue because, during the merge, despite your merge condition which was correctly partitioned, it was re-generating the source (and looking for that unique acc_num
). For all spark knows, the results to that source query might have changed, so it needs to re-scan the entire table to look for rows that match that acc_num
. Since it had to scan the entire table, this is why you are getting concurrent append exceptions.
@scottsand-db the issue indeed was wrt to the way we were creating the the new/updated record. thank you.
@aish-war great, glad I could help!
https://stackoverflow.com/questions/68758331/exception-org-apache-spark-sql-delta-concurrentappendexception-files-were-adde