delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
6.98k stars 1.6k forks source link

Duplicate Record on Upsert Issue #527

Closed SonuSingh1411 closed 1 year ago

SonuSingh1411 commented 3 years ago

Just to give the background, we use AWS EMR to run Spark and use pyspark code for running the Delta code and save the file on S3. We have been using Delta Lake to ingest data from Oracle in AWS S3 in an incremental fashion. But recently, we have discovered that Delta, is creating some duplicate id’s after the merge. We also checked oracle and found that the ID’s are auto increment Primary Keys. Could you please help us debug the issue? Below is code snippet from our Merge Logic:

delta_table.alias("current").merge(
    new_df.alias("new"), "current.ID = new.ID"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

I can share the entire logic that we have if its needed. But any guidance would be really helpful.

tdas commented 3 years ago

are you sure there are no duplicate ids in the source of the merge, that is, in the new_df dataset? if there are duplicate ids in the source dataset, and those ids are not in the target table (that is, not matched), then all the duplicate ids will be inserted.

SonuSingh1411 commented 3 years ago

Yes. the ID column is sequence Primary Key in oracle and hence it cannot have duplicates. I also validated that by looking at the ID's that were duplicates in source and they are unique.

tdas commented 3 years ago

Are you possibly running into consistency issues of S3 by running merge concurrently from multiple spark clusters? That is, multiple clusters/drivers are not yet supported on S3 - https://docs.delta.io/latest/delta-storage.html#amazon-s3

SonuSingh1411 commented 3 years ago

We have looked at this and as we are ingesting from Oracle, we run a single Spark application from a single driver and the data gets inserted or updated from a single cluster. We don't have multiple cluster executing on the same datasets.

SonuSingh1411 commented 3 years ago

Any idea on the issue?

arnehuang commented 3 years ago

I have the same issue, verified no duplicates from source. However on a long running job for the past 2 months we now have 77 instances of duplicate merge key rows. pyspark, aws EMR, s3, spark 2.4.5, delta 0.6.1, single spark application. We cannot reproduce reliably.

The job is running on YARN which occasionally restarts it from checkpoint. Also I am running multiple merges to different tables in the same application, could either of these be an issue?

SonuSingh1411 commented 3 years ago

Yeah...its the same issue with us. We cannot reproduce this issue. It gets randomly created. I assume this is a bug in Delta, but have not received any guidance on it.

SonuSingh1411 commented 3 years ago

@arnehuang I am not sure if merging different table would cause an issue. Also, we run one job per application and still we are getting this bug. So I assume this is a bug with Delta open source.

scanry commented 3 years ago

I have the same problem,i suspect it was due to the failure of the task image

scanry commented 3 years ago

I have the same problem,i suspect it was due to the failure of the task image

zsxwing commented 3 years ago

@scanry which column has the unique id? srcid? Are you using Delta Lake to read the table?

zsxwing commented 3 years ago

@SonuSingh1411 Could you show us the codes of new_df? Does the data source of new_df guarantee no duplicates? Is it Spark's JDBC data source?

scanry commented 3 years ago

@scanry which column has the unique id? srcid? Are you using Delta Lake to read the table?

  1. The sys_ods_row_id is unique id,it is a physics unique id。
  2. read from delta lake
  3. i used dataset.dropDuplicates(sys_ods_row_id)

so : I guess this delta_lake didn't take care of it when spark task exception

SonuSingh1411 commented 3 years ago

@zsxwing Yes, we use Oracle JDBC to connect and oracle doesnot have a duplicate record, the primary key is unique and auto increment, so the new_df should have the correct data. Below is how we calculate the new_df:

new_df = (
                self.get_all_data()
                .where(fx.col("UPDATED_ON") >= fx.lit(max_updated_on))
                .withColumn("UPDATED_ON_MONTH", fx.trunc(fx.col("UPDATED_ON"), "mon"))
 )
khannavivek commented 3 years ago

Hi, I am also facing the same issue,

Did anyone find any solution to it?

Currently, after writing the data I am deleting the records with duplicate primary key keeping the record with the maximum modified date to avoid any data conflicts.

rtjarvis commented 3 years ago

We're having this issue too. Looks like it happens when the checkpoints are created. Duplicates are created AND records lost.

scanry commented 3 years ago

hi,Does anyone care about this?

SonuSingh1411 commented 3 years ago

I don’t think so. I have been trying to get response on this and no one from Delta Team has been responding or trying to help.

Thanks

On Sun, Mar 14, 2021 at 10:24 PM scanry @.***> wrote:

hi,Does anyone care about this?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/delta-io/delta/issues/527#issuecomment-799105661, or unsubscribe https://github.com/notifications/unsubscribe-auth/ARFZQFH7MTB55I6DK73DB4LTDWKYXANCNFSM4R5KQCPQ .

rahulsmahadev commented 3 years ago

@SonuSingh1411 @scanry is it possible to help us reproduce this issue.

scanry commented 3 years ago

@SonuSingh1411 @scanry is it possible to help us reproduce this issue. The following example occurs in a production environment: image logic unique column for application layer = company_id physics unique column for data-platform ods layer = sys_ods_row_id physics unique column for current layer = sys_row_id logic unique column for current layer = sys_id = isUpdate?old_sys_id:sys_row_id id of batch job instance = sys_job_id

Processing logic: merge into dwd_pixxx_company t1 using ( select output columns from ( select row_number() over(partition by company_id order by sys_row_id desc) as rnk, other columns from ods_pixxx_company where sys_table_version = ${ods_pixxx_company.tableVersion} and sys_job_id = ${ods_pixxx_company.jobId} ) t where rnk = 1 ) t2 on (t1.company_id=t2.company_id) when matched then ..... when not matched then .....

SonuSingh1411 commented 3 years ago

Sure. I can try to help in reproducing the issue. What help do you need from me ?

Thanks

On Mon, Mar 15, 2021 at 5:58 PM scanry @.***> wrote:

@SonuSingh1411 https://github.com/SonuSingh1411 @scanry https://github.com/scanry is it possible to help us reproduce this issue. The following example occurs in a production environment: [image: image] https://user-images.githubusercontent.com/9125005/111239236-b995fa00-8633-11eb-9503-940705cdc908.png logic unique column for application layer = company_id physics unique column for data-platform ods layer = sys_ods_row_id physics unique column for current layer = sys_row_id logic unique column for current layer = sys_id = isUpdate?old_sys_id:sys_row_id id of batch job instance = sys_job_id

Processing logic: merge into dwd_pixxx_company t1 using ( select output columns from ( select row_number() over(partition by company_id order by sys_row_id desc) as rnk, other columns from ods_pixxx_company where sys_table_version = ${ods_pixxx_company.tableVersion} and sys_job_id = ${ods_pixxx_company.jobId} ) t where rnk = 1 ) t2 on (t1.company_id=t2.company_id) when matched then ..... when not matched then .....

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/delta-io/delta/issues/527#issuecomment-799863289, or unsubscribe https://github.com/notifications/unsubscribe-auth/ARFZQFGPUJ4HYHLAIQMRAVTTD2UMTANCNFSM4R5KQCPQ .

rahulsmahadev commented 3 years ago

Thank you @SonuSingh1411 , what would be great is if there was a self contained example or set of steps to follow to reproduce this issue. Because we have not seen this before.

SonuSingh1411 commented 3 years ago

I don’t think I am able to reproduce the issue and that’s the main problem. This issue happens sporadically and we don’t the reason why. Let me know how I can help.

Thanks

On Tue, Mar 16, 2021 at 8:41 AM Rahul Shivu Mahadev < @.***> wrote:

Thank you @SonuSingh1411 https://github.com/SonuSingh1411 , what would be great is if there was a self contained example or set of steps to follow to reproduce this issue. Because we have not seen this before.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/delta-io/delta/issues/527#issuecomment-800373623, or unsubscribe https://github.com/notifications/unsubscribe-auth/ARFZQFDHLASE375PTEVFMYTTD532DANCNFSM4R5KQCPQ .

rahulsmahadev commented 3 years ago

If anyone else in this thread able to help us get a reproducer. That would be great.

tdas commented 3 years ago

Since this is so hard to reproduce, let me try outline a debugging strategy that you can use on your side.

  1. As soon as you detect duplicate keys, try to find out the files names that has those keys. This can be done by using the spark sql function input_file_name(). So run something like select *, input_file_name() from table where key = duplicate_key. This should produce the file names.

  2. If both the duplicate keys are in the same file, then that is a BIG issue - a single merge command definitely produced the duplicates. In that case, please provide your full merge command, with all the matched / not-matched clauses. If possible, please provide screenshots of the SQL plans of the jobs generated by MERGE from the Spark UI > SQL tab.

  3. If the duplicate keys are in different files, then it is possible (not guaranteed) that they were generated by different merge commands. That should not happen, unless there was some kind of race condition in committing to the table. Can happen with S3 if stars aligned in unfortunate ways. To confirm that, search for those two file names in the transaction log of the Delta table. That is, search through the json files in the tablePath/_delta_log/ directory for those file names. If those two files names are in the same json file (name like 000..000123.json where 123 is the table version), then that means both those files with duplicates were created by a single MERGE like case 2 above and follow the instructions in 2. If they are in different JSON files, then they were generated in two different MERGE commands. Look at the version and timestamp of those two JSON files, and see if they are very close by or not. If they are, then it could have been a race condition. Please post the contents of those two JSON files.

let's hope this gives some pointers.

ericjm24 commented 3 years ago

Hi @tdas, I'm working with @SonuSingh1411 on the same project. We identified some duplicate data in the latest ingestion, and following your suggestion I tracked down the individual partition files of all the duplicates.

For reference, we are using PySpark 2.4.5, delta 0.7.0, aws EMR, ojdbc8 to ingest from oracle, writing to s3.

First thing to note is again the rarity and seeming randomness of the issue. We ingest dozens of tables and only two had noticeable defects, and of those the first changes ~150 records and had only 2 duplicates, while the second changed ~19,000 records with a total of 5 duplicates.

Duplicates always appeared across two different partition files, and the different records that were duplicated could appear in different partitions from each other. As an example, for our table with two duplicates:

Row(ID=Decimal('818321257.0000000000'), FILE_NAME='../PERS/UPDATED_ON_MONTH=2021-03-01/part-00003-ff3571cd-8466-4881-9456-fa49c9a0f989.c000.snappy.parquet') Row(ID=Decimal('818321257.0000000000'), FILE_NAME='../PERS/UPDATED_ON_MONTH=2021-03-01/part-00002-d3989b64-983c-40c3-8ea5-20c6525b422e.c000.snappy.parquet') Row(ID=Decimal('818565974.0000000000'), FILE_NAME='../PERS/UPDATED_ON_MONTH=2021-03-01/part-00006-4f94cdaa-1321-4f66-b1f3-6522496d1653.c000.snappy.parquet') Row(ID=Decimal('818565974.0000000000'), FILE_NAME='../PERS/UPDATED_ON_MONTH=2021-03-01/part-00005-a19a0712-14a7-430a-bc0a-da19f8889091.c000.snappy.parquet')

The first two records have the same ID (which again is an auto-incrementing primary key in the source, and cannot be duplicated) but appear in different partition files. The second set of duplicates is similar, and the partition files for the first set of records is also independent of the second set. The UPDATED_ON_MONTH partition value agrees for all records.

I then went into the delta logs and identified that all 4 of these files are included in the same delta log json file, indicating that they were all created within the same merge statement. Our merge statement is very simple, as Sonu indicated above:

delta_table.alias("current").merge( new_df.alias("new"), "current.ID = new.ID" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

tl;dr The duplication issue happens across different partition files within a single merge.

dennyglee commented 3 years ago

Thanks for the additional information, we will dive in deeper to see if this can point us a direction to debug. Is there any chance that this could be repro'ed using a public dataset? Out of curiosity, if you were to re-run the same tests without partitioning, you would not get this error? As well, how wide are your tables? Thanks!

ericjm24 commented 3 years ago

I am sure it is reproducible on a public dataset, but I am not capable of doing that in my current position. Maybe one of the other people who have commented on this issue can help. @scanry above provided a screenshot of their data, so they might be able to assist further assuming they are experiencing the same bug we are.

I suspect this problem would go away if we kept it to a single file per data partition, but we have time limits on our jobs and cannot afford to attempt that. It would be too slow.

As for table size, most of our tables are in the range of 10-20 columns wide. We have a few that are around 100 but I don't think we've ever seen this issue with those tables (they tend to be very slowly changing). Tables can also be in the 10-100 million range in length, but as noted above we are usually only changing at most a few tens of thousands of records in any given night.

Nestor10 commented 3 years ago

Running into a similar issue; sporadic - rare - dups. Some key differences, db runtime 8.2 (delta 0.8.0), source for merge is another delta table not jdbc. I'm in the process of cranking out unit tests and will see if I can reproduce this.

er-rohit-bansal commented 3 years ago

We are also having same problem of getting duplicates generated, I can observe that both duplicates are in same file.

Below is our merged reporting table having primary key -

image

We are merging this from Delta staging table where data is streamed from Azure eventhub when you see the data in staging table we dont find duplicates using the same key -

image

here is simple merge statement -

image

Nestor10 commented 3 years ago

Tried to reproduce this in a unit test. This is a hard one. I ran into https://github.com/delta-io/delta/issues/630 so i had to bump to delta 1.0.0. (It this still super secret?). Anyway, even after 100 merges i could not get this bug to show. I tried partitioning the data in a couple of different ways. The delta sources are on my FS and not S3. So perhaps this is evidence that we have some weird s3 stuff happening.

Nestor10 commented 3 years ago

One last note. I'm a databricks customer, I can provide a information via DM to start a conversation. I have a DB instance where this happens frequently and there may be an opportunity to share more detailed information. I'll in a parallel effort ping my DB support contacts.

chrishfish commented 3 years ago

@Nestor10 definitely if you have seen this on a Databricks cluster then it will be easier for us to debug, please do raise a support ticket ASAP

Nestor10 commented 3 years ago

@chrishfish Not sure if a CASE NUMBER # 00103912 would be helpful to you but here it is. Mathan is working it, if that has meaning. If you have the capacity to connect... could really grease the wheels.

chrishfish commented 3 years ago

@Nestor10 perfect thank you!

SonuSingh1411 commented 3 years ago

@Nestor10 thanks a lot for the defect. Hopefully this helps to get to the root of the problem. Please do keep us posted if you get any concrete update on the resolution of the issue.

tdas commented 2 years ago

We are looking into it. Will keep this thread posted if we manage to find the root cause.

bigdatamoore commented 2 years ago

We are experiencing a similar issue in Azure Databricks. We are seeing dupes in a table that we are loading with a merge. We have raised a ticket with Microsoft. +++ Update on this - I was able to determine that our issue was caused by a user error, not a system one. So false alarm.

tdas commented 2 years ago

We have not been able figure out the root cause yet since it is so hard and rare to reproduce. But one very likely cause of duplicates can be non-determinism in the data generated by the source of the merge. Merge operates by two passes on the source - first pass to identify which files have matching data, and the second pass to rewrite those files. If the data generated through that source plan is different between the two passes, then that can produce incorrect results. This non-determinism can occur in various ways (i) The underlying table data that is being read in the source may have changed between the two passes. (ii) The transformation of the data that generates the source may be non-deterministic. E.g. some map operation on the source.

Here are the different follow-up steps to mitigate/debug the issue.

  1. If you want to eliminate chances of non-determinism between two passes of the source, then you can either (a) cache the source, before feeding it to merge, or (b) write the source to a temporary table on disk, and then use the table to create a new source. if the source data can be large, and caching the data in spark becomes a concern, then this is more robust.

  2. If you want to debug the non-determinism (I urge you, please help us debug it), then instead of removing the non-determinism by cache or temp table, keep it as it is. Instead, additionally, just write the source to the table for recording the exact source data that was used to make the merge. If you keep a record of the source for every merge made, then if one of the merges causes duplicates, you can easily recreate that situation using the saved source. For example if merge on version X caused duplicates, then you can replay the merge for debugging by

    • Create a new delta table by using version X of the target table
    • Merge the corresponding saved source into this new table.
    • Compare the result of this merge to target table X+1. If they are the same (that is, duplicates created), then we have created a reproducible situation!!! report it to us and we can debug it further. If they are different, that is, no duplicates, then it indicates that the source non-determinism was the problem -- the source in the original merge caused duplicates, but the "supposedly same source data" not does not.
Nestor10 commented 2 years ago

@tdas Thanks for the attention to this issue! I was able to identify a non-deterministic behavior in my data. Current_timestamp, yep, that'll do it and so, I fall clearly onto category ii that you outlined above. I spent, probably way to much time, reproducing this behavior in a moch spark run. However, I had some very interesting result. At first, despite 1000s of tries and making my data sources non-deterministic, very non-deterministic, I could not get duplicate events. I discovered one condition that precluded ever seeing these duplicate rows. That condition was that the merge source (not the delta source that we are merging into but rather the data we wish to merge in) is not empty. Even when merge sources are non-deterministic and being mutated, as long as the merge operated over at least one row in the first pass duplicate records are never created. Its as if the first pass caches the join results conditionally; if there is data to join with but skip this caching if the source table is empty (totally makes sense, optimization). The second pass seems to re-read the source table only when a table is not cached. Not sure if this help or is just a weird coincident but that was my finding. Over the next couple of week my free time will be spent listening to Martin Odersky lecture and solving some overly clever scala problems but after that if there is still a need I believe i could create a standalone unit test for this.

tdas commented 2 years ago

That corner case is somehow an artifact of how merge is internally implemented, which, obviously is subject to change. Good job catching it though. And thank you for confirming that it is indeed due to non-determinism. I hope this is the reason for other's problems as well.

We will document this in the Delta docs.

RohitJadhav95 commented 2 years ago

Hi, thanks for all the questions and information provided. I recently faced a similar issue wherein I found duplicates in my delta table when merge upserting the data.

I am upserting the data to a delta table in S3 using a Glue Job. There is only one Glue Job that is writing to the delta table and to ensure deduplication from the source, I explicitly dedup the data before merging it to the final table.

However, in my case I observed that I faced this issue only when my delta table was partitioned and there were no duplicates in case of non-partitioned table.

I am using an external table in Athena created with the manifest file to verify the duplicates and querying data.

This is the merge query that I am using -

df.alias("business").merge(inputDF_partitioned.alias("delta"), "business.id = delta.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Question - Is there something we need to be aware of while upserting data into partitioned delta tables or it is handled by delta tables itself?

Thanks is Advance

jgoldman83 commented 2 years ago

Hey guys, I was able to reproduce it and found a solution. Just make sure that the parquet file that you are reading for the upserts, have the same schema as the source.

sebastianGruenwald commented 2 years ago

[SOLVED -- NOT THE SAME ISSUE, SEE BELOW] Hi all, I Just wanted to share some insights I just had on this issue. I was experiencing the same problem with duplicates after merge into a delta table.

Here is my setup:

Implementation:

But when I ran the code i got duplicates after at least two batches of data where processed.

Solution After trying several thins I left away the partitioncolumn condition in the merge into clause. The batches take longer to run now but no more duplicates. It seems this issue had something to do with partition pruning.

Code Sorry guys this can not be used directly to reproduce the issue. But I think any source dataset with duplicates and a partitioning that is normally consistent with the key should be enough to do so.

import pyspark.sql.functions as f
from delta import DeltaTable

def write_batch(data, batch_id):
    data_consolidated = data.groupby("ORDERID").agg(
        f.max("sequenceNumber").alias("max_sequence_number"),
        f.max("enqueuedTime").alias("max_enqueued_time"),
        *[
            f.last(x, ignorenulls=True).alias(x)
            for x in sorted(data.columns[:28])
            if x not in ["ORDERID", "sequenceNumber"]
        ]
    ).withColumn("BDATE", f.col("BTIMESTAMP").cast("date")).select("BDATE", *data.columns[:28], "max_sequence_number", "max_enqueued_time").drop("ingestTime", "ingestDate")
    data_consolidated.persist()
    spark.catalog.setCurrentDatabase("test")
    if "target_table" not in [x.name for x in spark.catalog.listTables()]:
        (data_consolidated.write.partitionBy("SOMECOL", "BDATE").mode("overwrite").saveAsTable("test.target_table"))
    else:
        target_table = DeltaTable.forName(spark, "test.target_table")
        ## For Partition Pruning: get all dates wich are in the update batch
        bdatus = [x["BDATE"].isoformat()[:10] for x in data_consolidated.select("BDATE").drop_duplicates().collect()]
        if batch_id % 10 == 0:
            spark.sql("optimize test.silver_forecasts")
        if batch_id % 100 == 0:
            spark.sql("optimize test.silver_forecasts zorder by btimestamp")
        (target_table.alias("t").merge(data_consolidated.alias("s"),
              f"""t.ORDERID = s.ORDERID"""
              ## This condidion caused the issue. If you leave it away the duplicates are gone
              + f""" and t.BDATE in ('{"','".join(bdatus)}')""" 
               )
    .whenMatchedUpdateAll("t.max_sequence_number < s.max_sequence_number")
    .whenNotMatchedInsertAll()
    .execute())

stream = spark.readStream.option("maxFilesPerTrigger", "1").table("test.source")
stream.writeStream.outputMode("append").foreachBatch(write_batch).start()

# query to identify duplicates:
spark.read.table("test.target_table").agg(f.countDistinct("ORDERID"), f.count("ORDERID")).display()
bart-samwel commented 2 years ago

@sebastianGruenwald Potentially a stupid question: I'm not sure what the .cast("BDATE") does in the 9th line of write_batch. Should that be .cast("DATE")?

sebastianGruenwald commented 2 years ago

yes of course you are right. I changed it

bart-samwel commented 2 years ago

@sebastianGruenwald Is BDATE ever NULL? Because the IN based pruning won't work in that case.

sebastianGruenwald commented 2 years ago

No It can't be NULL. But i think I found the issue. In the original statement I had

.withColumn("BDATE", f.date_trunc("day", "BTIMESTAMP"))

instead of

.withColumn("BDATE", f.col("BTIMESTAMP").cast("date"))

I thought it wouldn't make a big difference but testing it again this accually solves the issue. So accutally my fault, I forgot that date_trunc gives back a timestamp and BDATE IN ('2022-05-01') woudn't match for BDATE='2022-05-01T00:00:00'

bart-samwel commented 2 years ago

That makes sense! So this is not an instance of the issue described in this ticket.

sebastianGruenwald commented 2 years ago

Should I remove it?