apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT] Using monotonically_increasing_id to generate record key causing duplicates on upsert #7829

Open jtmzheng opened 1 year ago

jtmzheng commented 1 year ago

Describe the problem you faced For context we have tables that are snapshotted daily/weekly (eg. RDS export) that we then have Spark jobs convert into Hudi tables (ie. we overwrite the full table). We attach a column using monotonically_increasing_id (https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html ) as a surrogate key if there is no primary key available in our metadata.

We're seeing some really odd behavior where it seems like the same record is sometimes written twice with different record keys (which is overwriting other records), eg dummy example:

Example input:
Row 1: id = 1
Row 2: id = 2
Row 3: id = 3

Adding monotonically_increasing_id (the record key of the Hudi table)
Row 1: id = 1, monotonically_increasing_id = 1
Row 2: id = 2, monotonically_increasing_id = 2
Row 3: id = 3, monotonically_increasing_id = 8589934593

Hudi table becomes:
Row 1: id = 3, monotonically_increasing_id = 1
Row 2: id = 2, monotonically_increasing_id = 2
Row 3: id = 3, monotonically_increasing_id = 8589934593

The problem seems non-deterministic (ie. re-running on the same input will fix the issue), for example on one job we saw:

# of rows: 154982072
# of duplicate rows with different record keys: 813263

When an upsert happens, is there retry logic that does a "partial retry"? The docs for monotonically_increasing_id mention its generated from (partition_id, record_number) and we suspect for whatever reason some rows are upserted multiple times in different stages (and changes the partition_id and record_number).

To Reproduce Unknown, re-running over the same input leads to different results. I asked in Hudi Slack and someone mentioned keygenerator needs to be unique https://apache-hudi.slack.com/archives/C4D716NPQ/p1675336371420009?thread_ts=1675301744.998269&cid=C4D716NPQ

Expected behavior This seems like an issue with our usage of Hudi:

  1. What are the requirements for record key? Would using Spark's uuid be safe? We found https://issues.apache.org/jira/browse/SPARK-23599

We have encountered this problem with Spark 3.1.2, resulting in duplicate values in a situation where a spark executor died. As suggested in the description, this error was hard to track down and difficult to replicate.

Is there a way to generate a surrogate key for a Hudi table?

  1. Would changing our operation to insert/bulk_insert fix the issue? Naively to me it seems like this will cause duplicates, ie the output becoming:
    Row 1: id = 1, monotonically_increasing_id = 1
    Row 1: id = 3, monotonically_increasing_id = 1
    Row 2: id = 2, monotonically_increasing_id = 2
    Row 3: id = 3, monotonically_increasing_id = 8589934593

Environment Description

We are running on EMR 6.9

danny0405 commented 1 year ago

For monotonically_increasing_id you may want to do INSERT operation instead ?

jtmzheng commented 1 year ago

@danny0405 https://apache-hudi.slack.com/archives/C4D716NPQ/p1675378366882569?thread_ts=1675301744.998269&cid=C4D716NPQ

issue is, our upsert partitioner/insert partitioner is based on hash of record key. and hence if on re-trigger due to spark task retries, records could move from one insert partition to another and hence could result in data loss or duplication.

It sounds like insert would have the same issue based on that description.

danny0405 commented 1 year ago

Somehow you need to make sure the auto-generated ids are globally unique, you can use the INSERT operation with insert deduplication disabled.

nsivabalan commented 1 year ago

@alexeykudinkin : can you take this up

alexeykudinkin commented 1 year ago

@jtmzheng that's an interesting one.

We're currently looking into adding the support for auto-generated keys, and are actually planning to use something similar to monotonically_increasing_id in our approach.

Can you please share a bit more detail how exactly you generate these keys? If you can provide code snippet that would be very much appreciated

jtmzheng commented 1 year ago

@alexeykudinkin sorry for the delay in response - we just have:

from pyspark.sql import functions as f

df = spark.read.format("parquet").load(INPUT_PATH) # eg. a MySQL table snapshot
df = df.withColumn(
    "_autogenerated_primary_key",
    F.monotonically_increasing_id(),
)
df.write.format("hudi").save(OUTPUT_PATH)

We switched to using uuid and haven't seen the issue since - though this is tricky to reproduce because the problem happens on Spark retries of key generation

Edit: https://godatadriven.com/blog/spark-surprises-for-the-uninitiated/ explains the problem

alexeykudinkin commented 1 year ago

@jtmzheng thanks for providing additional context!

Can you please help me understand how did you determine duplicate rows in here:

# of rows: 154982072
# of duplicate rows with different record keys: 813263
jtmzheng commented 1 year ago

We have a two stage pipeline:

  1. Snapshot of MySQL table (as parquet files)
  2. Convert to a Hudi table (ie. read in parquet, write out as Hudi table)

# of rows: 154982072

# of duplicate rows with different record keys: 813263

It was easy to confirm this by comparing against the parquet input.

alexeykudinkin commented 1 year ago

@jtmzheng what operation are you using? Is it "upsert"?

jtmzheng commented 1 year ago

Yep using upsert (Note: In https://apache-hudi.slack.com/archives/C4D716NPQ/p1675378366882569?thread_ts=1675301744.998269&cid=C4D716NPQ implied this would also be a problem with insert)

alexeykudinkin commented 1 year ago

@jtmzheng can you please paste your whole config you've been using to write to Hudi?

jtmzheng commented 1 year ago

Added some comments inline for clarification below

write_options = {
            "hoodie.table.name": name,
            "hoodie.datasource.write.table.name": name,
            "hoodie.datasource.write.operation": "upsert",
            "hoodie.datasource.write.table.type": table_type.value, # always COW for these tables
            "hoodie.datasource.write.partitionpath.field": "",
            "hoodie.datasource.write.recordkey.field": ",".join(primary_keys), # `_autogenerated_primary_key` if none specified
            "hoodie.datasource.write.precombine.field": precombine_field, # `_autogenerated_primary_key` if none specified
            "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.CustomKeyGenerator",
            "hoodie.datasource.write.hive_style_partitioning": "true",
            "hoodie.metadata.enable": "false",
            "hoodie.bootstrap.index.enable": "false",
            # Metrics configs
            "hoodie.metrics.on": "true",
            "hoodie.metrics.reporter.type": "GRAPHITE",
            "hoodie.metrics.graphite.host": {graphite_host},
            "hoodie.metrics.graphite.port": "{graphite_port}",
            "hoodie.metrics.graphite.metric.prefix": f"lake_{environment.value}",
        }

        if partition_cols:
            write_options["hoodie.datasource.write.partitionpath.field"] = ",".join(
                f"{c}:SIMPLE" for c in partition_cols
            )
            write_options["hoodie.index.type"] = "BLOOM" # use default of SIMPLE on non-partitioned
nsivabalan commented 1 year ago

I might know why this could be happening. if you can clarify something, we can confirm.

for a given df, while generating the primary key using monotonically increasing func, if we call the key generation twice, it could return diff keys right? just that spark will ensure they are unqiue. but it may not be the same?

bcoz, down the line, our upsert partitioner is based on the hash of the record key. so, if for one of the spark partitions, if spark dag is re-triggered, chances that re-attempt of primary key generation could result in a new set of keys (whose hash value) might differ compared to first time, you might see duplicates or data loss.

jtmzheng commented 1 year ago

The docs for monotonically_increasing_id mention its generated from (partition_id, record_number), so if a retry changes either then the keys would be different. I'm not sure internally if a stage is partially retried what happens to the partition_id, but that was my hypothesis.

jtmzheng commented 1 year ago

Small update, we've confirmed this is still an issue even after switching to Spark's uuid

ad1happy2go commented 1 year ago

@jtmzheng

This issue was partially resolved in spark with this JIRA - https://issues.apache.org/jira/browse/SPARK-23599

But if you check last comment on above JIRA, someone started to see similar duplicate issue what you have with UUID function.

"We have encountered this problem with Spark 3.1.2, resulting in duplicate values in a situation where a spark executor died. As suggested in the description, this error was hard to track down and difficult to replicate."

How frequent it is ? As a workaround, Can you use combination of both monotonically_increasing_id and uuid to ensure it to be always unique. May be it give a small performance hit due to generation of such a large id but it should be always unique.

jtmzheng commented 1 year ago

@ad1happy2go yeah saw that ticket a while back, I'm curious if the surrogate key functionality Delta supports (https://www.databricks.com/blog/2022/08/08/identity-columns-to-generate-surrogate-keys-are-now-available-in-a-lakehouse-near-you.html) has the same issue.

nsivabalan commented 1 year ago

hey @jtmzheng : yeah, this might end up w/ duplicates or data loss on failure scenarios. we have put up https://github.com/apache/hudi/pull/8107 for this ask. If you can try and let us know if it works, it would be nice. we did test w/ failure scenario and validated that there is no data loss or duplicates w/ this patch.

mzheng-plaid commented 1 year ago

@nsivabalan hmm from the description in #8107 :

Engine's task partitionId or parallelizable unit for the engine of interest. (Spark PartitionId incase of spark engine) Row id: unique identifier of the row (record) w/in the provided task partition. Combining them in a single string key as below

"${committimestamp}${partitionid}${row_id}"

For row-id generation we're planning on using generator very similar in spirit to monotonically_increasing_id() expression from Spark to generate unique identity value for every row w/in batch (could be easily implemented for any parallel execution framework like Flink, etc)

How does this avoid the same problem in this ticket?