apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.76k stars 1.96k forks source link

[Bug] Iceberg adds a new row when deleting a row in MySQL #2703

Closed MartinHou closed 1 year ago

MartinHou commented 1 year ago

Search before asking

Flink version

1.16.2

Flink CDC version

flink-sql-connector-mysql-cdc-2.2.1

Database and its version

MySQL 5.7

Minimal reproduce step

  1. First, create a new table in Iceberg

    CREATE TABLE results(
    "id" varchar, 
    "input_md5" varchar, 
    "output_md5" varchar, 
    "log" varchar, 
    "metric" varchar, 
    "create_time" TIMESTAMP(6) WITH TIME ZONE, 
    "update_time" TIMESTAMP(6) WITH TIME ZONE, 
    "workflow_id_id" varchar, 
    "error_details" varchar, 
    "error_stage" varchar, 
    "error_type" varchar
    )
    WITH ( format = 'PARQUET',partitioning = ARRAY['hour(create_time)'])
  2. Then, run these

    
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.table import StreamTableEnvironment
    from pyflink.datastream.checkpointing_mode import CheckpointingMode

def run(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1).enable_checkpointing(2000, CheckpointingMode.EXACTLY_ONCE)

table_env = StreamTableEnvironment.create(env)

table_env.execute_sql("""
    CREATE TABLE src_result (
        `id` STRING,
        input_md5 STRING,
        output_md5 STRING,
        log STRING,
        metric STRING,
        create_time TIMESTAMP(6),      
        update_time TIMESTAMP(6),
        workflow_id_id STRING,
        error_details STRING,
        error_stage STRING,
        error_type STRING,
        PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'x',
        'port' = '3306',
        'username' = 'x',
        'password' = 'x',
        'database-name' = 'x',
        'table-name' = 'result'
    );
""")

table_env.execute_sql("CREATE CATALOG iceberg WITH ("
                  "'type'='iceberg', "
                  "'catalog-type'='hive', "
                  "'uri'='thrift://x',"
                  "'warehouse'='tos://x',"
                  "'format-version'='2')")

table_env.execute_sql("""INSERT INTO iceberg.results select * from src_result;""").print()


### What did you expect to see?

When I update/delete rows in MySQL, Iceberg will update/delete as well.

### What did you see instead?

After updating a row in MySQL, there are 2 original rows and 1 updated row in Iceberg.
After deleting a row in MySQL, there are 2 original rows remaining. 

It seems that it is adding instead of deleting when deleting a row.

### Anything else?

The parallelismis 1.

### Are you willing to submit a PR?

- [ ] I'm willing to submit a PR!
loserwang1024 commented 1 year ago

It may not a problem of Mysql CDC. You can use print to log Mysql CDC source's data.If it includes [+I], [-U] and [+U], it means that cdc source is correct, maybe some thing wrong in Iceberg Sink.

MartinHou commented 1 year ago

It may not a problem of Mysql CDC. You can use print to log Mysql CDC source's data.If it includes [+I], [-U] and [+U], it means that cdc source is correct, maybe some thing wrong in Iceberg Sink.

Thanks for the reply! Yes, the CDC source is printing the correct events. Maybe the it's not a cdc problem.