apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.24k stars 2.39k forks source link

[SUPPORT]duplicate rows in my table #10781

Closed chenbodeng719 closed 4 months ago

chenbodeng719 commented 4 months ago

Tips before filing an issue

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

1.

I have duplicate rows in my table . The below is my flink hudi config. By consuming data from kafka to upsert hudi_sink table. And I use pyspark to read the table, but I get duplicate data.

# flink write hudi conf
        CREATE TABLE hudi_sink(
            new_uid STRING PRIMARY KEY NOT ENFORCED,
            uid STRING,
            oridata STRING,
            part INT,
            user_update_date STRING,
            update_time TIMESTAMP_LTZ(3) 
        ) PARTITIONED BY (
            `part`
        ) WITH (
            'table.type' = 'MERGE_ON_READ',
            'connector' = 'hudi',
            'path' = '%s',
            'write.operation' = 'upsert',
            'precombine.field' = 'update_time',
            'write.tasks' = '%s',
            'index.type' = 'BUCKET',
            'hoodie.bucket.index.hash.field' = 'new_uid',
            'hoodie.bucket.index.num.buckets' = '%s',
            'clean.retain_commits' = '0',
            'compaction.async.enabled' = 'false'
        )

# spark read

            readOptions = {
            }
            prof_df = sqlc.read \
                .format('org.apache.hudi') \
                .options(**readOptions) \
                .load(tpath)

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

ad1happy2go commented 4 months ago

@chenbodeng719 Can you please let us know what Hudi/Flink/Spark Versions you are using? Are you are getting duplicate rows when reading using spark only OR you are getting the same behaviour when you try to read back using flink too.

chenbodeng719 commented 4 months ago

@chenbodeng719 Can you please let us know what Hudi/Flink/Spark Versions you are using? Are you are getting duplicate rows when reading using spark only OR you are getting the same behaviour when you try to read back using flink too.

I didnt try on flink. The problem happens when I use spark.

ad1happy2go commented 4 months ago

@chenbodeng719 Can you post screenshot of the duplicate records. Are they belong to different file group?

chenbodeng719 commented 4 months ago

@chenbodeng719 Can you post screenshot of the duplicate records. Are they belong to different file group?

Is there any possibility that I bulk insert a dataset with some duplicate keys, then any following upsert key which is same with dup key, would update the item twice. Like the below photo

image
ad1happy2go commented 4 months ago

Did you only used 0.14.1 only or is this the upgraded table from previous version? can you provide values for hudi meta columns also?

bulk_insert itself can ingest duplicates. Did you got duplicates after bulk_insert itself. Yes if that's the case, upsert is going to update both records. Did you confirmed if you had these duplicates after bulk_insert?

Running bulk_insert twice on same data also can cause this issue.

chenbodeng719 commented 4 months ago

Did you only used 0.14.1 only or is this the upgraded table from previous version? can you provide values for hudi meta columns also?

bulk_insert itself can ingest duplicates. Did you got duplicates after bulk_insert itself. Yes if that's the case, upsert is going to update both records. Did you confirmed if you had these duplicates after bulk_insert?

Running bulk_insert twice on same data also can cause this issue.

"if that's the case, upsert is going to update both records. " I guess it's my case. First, bulk insert brings some duplicate key into the table. Then when the upsert with duplicate key comes, it updates the duplicate rows with same key. In my case, two rows for one dup key has been changed. I wonder if there are five rows for one dup key, it updates the five rows?

ad1happy2go commented 4 months ago

Yes thats correct, You should remove dups after insterting using bull_insert or not use bulk insert at all in this case.

On Thu, Feb 29, 2024 at 4:04 PM chenbodeng719 @.***> wrote:

Did you only used 0.14.1 only or is this the upgraded table from previous version? can you provide values for hudi meta columns also?

bulk_insert itself can ingest duplicates. Did you got duplicates after bulk_insert itself. Yes if that's the case, upsert is going to update both records. Did you confirmed if you had these duplicates after bulk_insert?

Running bulk_insert twice on same data also can cause this issue.

"if that's the case, upsert is going to update both records. " I guess it's my case. First, bulk insert brings some duplicate key into the table. Then when the upsert with duplicate key comes, it updates the duplicate rows with same key. In my case, two rows for one dup key has been changed. I wonder if there are five rows for one dup key, it updates the five rows?

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/10781#issuecomment-1970850132, or unsubscribe https://github.com/notifications/unsubscribe-auth/APD55YQZYWWO3TQ7UAOZBPTYV4B4ZAVCNFSM6AAAAABD7P3VEOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSNZQHA2TAMJTGI . You are receiving this because you commented.Message ID: @.***>