apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.67k stars 1.92k forks source link

[Bug] inconsistent data when mysql source table has default value #1615

Closed gleiyu closed 8 months ago

gleiyu commented 2 years ago

Describe the bug(Please use English) inconsistent data when mysql source table has default value

Environment :

To Reproduce Steps to reproduce the behavior:

  1. create source table
    CREATE TABLE `test` (
    `id` int NOT NULL,
    `name` varchar(100) DEFAULT NULL,
    `create_time` datetime DEFAULT CURRENT_TIMESTAMP,
    `update_time` datetime DEFAULT CURRENT_TIMESTAMP,
    `age` int DEFAULT '100',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
  2. insert data
    insert into test(id,name,create_time,age) values(17,'cc',null,null);
  3. mysql flink cdc

    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("test")
                .tableList("test.test")
                .username("cdc")
                .password("password")
                .serverId("5401-5404")
                .startupOptions(StartupOptions.latest())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print()
                .setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
        env.execute("Print MySQL Snapshot + Binlog");
  4. print data in console
    
    {"before":null,"after":{"id":17,"name":"cc","create_time":0,"update_time":1665759539000,"age":100},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1665730739000,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":31291639,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1665730739231,"transaction":null}

**create_time and age in source is null but the result is 0 and 100** 
lqbFFF commented 1 year ago

I found this problem too when using flinksql
I guess flinksql and datastream api both have this question

PatrickRen commented 8 months ago

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!