DataLinkDC / dinky

Dinky is a real-time data development platform based on Apache Flink, enabling agile data development, deployment and operation.
http://www.dinky.org.cn
Apache License 2.0
3.17k stars 1.16k forks source link

[Bug] [CDCSOURCE] CDCSOURCE 整库同步 MySQL 到 Upsert-Kafka,datetime 的值变成了1970年 #1979

Closed adu-shzz closed 1 year ago

adu-shzz commented 1 year ago

Search before asking

What happened

如题! 在使用 CDCSOURCE 将 MySQL 整库同步到 Kafka 之后,所有 datetime 列的值,在 Kafka 里都变成了 1970年-XX月-XX日的格式。

What you expected to happen

期望落地到 Upsert-Kafak 时,日期/时间 列的值能正常落地。

How to reproduce

Flink 版本 v1.16.0。 Dinky 版本 v0.7.3。 BUG 重现过程:

  1. 在【注册中心 - Flink 实例管理】创建 Flink 集群实例,名称为 Flink-1160。
  2. 在【数据开发】中创建 FlinkSQL 作业,名称为 test0-cdcsource,执行模式为 Standalone,选择 Flink 集群 【Flink-1160】:
    EXECUTE CDCSOURCE cdc_upsert_kafka WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '${MySqlHost}',
    'port' = '33061',
    'username' = 'root',
    'password' = '123456',
    'checkpoint' = '3000',
    'scan.startup.mode' = 'initial',
    'parallelism' = '1',
    'table-name' = 'test0\..*',
    'source.server-time-zone' = 'UTC',
    'sink.connector' = 'upsert-kafka',
    'sink.timezone' = 'UTC',
    'sink.topic' = 'dinky-test0-${tableName}',
    'sink.properties.bootstrap.servers' = '${kafkaHost}:9091,${kafkaHost}:9092,${kafkaHost}:9093',
    'sink.key.format' = 'json',
    'sink.value.format' = 'json'
    );

    MySQL5.7源表结构如下:

    CREATE TABLE `test0` (
    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
    `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
    `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    `fld1` varchar(255) NOT NULL,
    `fld2` varchar(255) NOT NULL DEFAULT '',
    `fld3` varchar(255) NOT NULL DEFAULT '',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  3. 保存之后,成功执行当前 SQL 作业,过一会用 Offset-Explorer 2.0 查看 Kafka 数据如下:
    {
    "id": 1,
    // MySQL 库表中实际值是 2023-05-18 01:55:08
    "create_time": "1970-01-20 11:52:54.908",
    // MySQL 库表中实际值是 2023-05-18 01:55:08
    "update_time": "1970-01-20 11:52:54.908",
    "fld1": "qwer",
    "fld2": "cew",
    "fld3": "w35"
    }

Anything else

No response

Version

0.7.3

Are you willing to submit PR?

Code of Conduct

aiwenmo commented 1 year ago

直接用 kafka ,而不是 datastream-kafka试试

adu-shzz commented 1 year ago

在 FlinkSQL 里没用 datastream-kafka,用的是 upsert-kafka。

试了一下: 更换为 kafka,数据不同步。 更换为 datastream-kafka,可以成功提交作业并同步数据了。但还是希望能用上 upsert-kafka。

sj-dev commented 1 year ago

我也出现这样,但是flinksql同步是正常的

aiwenmo commented 1 year ago

com.dlink.cdc.AbstractSinkBuilder.getLogicalType

case TIMESTAMP: return new TimestampType(column.getLength());

aiwenmo commented 1 year ago