apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.49k stars 2.24k forks source link

Handling Updates on Partition Columns in Iceberg with Flink CDC #11573

Open a8356555 opened 4 days ago

a8356555 commented 4 days ago

Apache Iceberg version

1.5.2

Query engine

Athena

Please describe the bug 🐞

Hi,

I'm using MySQL Flink CDC with Iceberg 1.5.2 and Flink 1.16. I have a table partitioned by the status column, but this column is subject to updates. When an update occurs, I encounter duplicate records in the Iceberg table, which is not the desired behavior.

Is there a way to properly handle updates on a partition column in Iceberg to avoid duplicates?

here is the sql of my flink CDC

CREATE CATALOG glue_catalog WITH (
    'type'='iceberg', 
    'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
    'warehouse'='s3://my-bucket'
);

CREATE TABLE mysql_cdc_source
(
    id  INT,
    status INT,
    value  INT,
    ...,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql',
    'port' = '3306',
    'username' = 'XXX',
    'password' = 'XXX',
    'database-name' = 'XXX',
    'table-name' = 'XXX',
    'debezium.snapshot.mode' = 'when_needed',
    'server-id' = '1'
);

CREATE TABLE IF NOT EXISTS glue_catalog.my_db.my_table(
    `id` INT NOT NULL,
    `status` INT,
    `value`  INT,
    ...,
iceberg)
PRIMARY KEY (id,status) NOT ENFORCED
) PARTITIONED BY (
    status
) WITH (
    'format-version'='2',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.upsert.enabled'='true',
    'write.delete.mode'='merge-on-read',
    'write.merge.mode'='merge-on-read',
    'write.update.mode'='merge-on-read'
);

INSERT INTO glue_catalog.my_db.my_table
SELECT
    *
FROM mysql_cdc_source;

data before updating mysql: image

data after updating mysql (duplicated row showed up): ζˆͺεœ– 2024-11-18 中午12 19 24

get same results using spark 3.4:

image

query glue_catalog.my_db.my_table.files using spark 3.4 image

Thanks!

Willingness to contribute

pvary commented 4 days ago

What are the records generated by the MySQL CDC connector?

You are using upsert mode in FlinkSink.

In upsert mode when an update happens, Flink expects an unchanged primary key. Removes the old values for a given primary key, and insets a new record.

When the record is updated in a way that the primary key is changed, then it is not really an update in upsert mode. It should be a delete and an insert instead. It is the responsibility of the input stream to generate the correct records.

You can use write.upsert.enabled set to false if the MySQL connector is able to generate a retract stream.

a8356555 commented 1 day ago

What are the records generated by the MySQL CDC connector?

You are using upsert mode in FlinkSink.

In upsert mode when an update happens, Flink expects an unchanged primary key. Removes the old values for a given primary key, and insets a new record.

When the record is updated in a way that the primary key is changed, then it is not really an update in upsert mode. It should be a delete and an insert instead. It is the responsibility of the input stream to generate the correct records.

You can use write.upsert.enabled set to false if the MySQL connector is able to generate a retract stream.

But my use case requires upsert, so in this scenario, using status as the partition key is not suitable, right?

pvary commented 17 hours ago

But my use case requires upsert, so in this scenario, using status as the partition key is not suitable, right?

The problem is not with the partitioning. The problem is that you added status to your PRIMARY KEY.