apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.67k stars 1.92k forks source link

[Bug] Paimon CDC ingestion with mysql-cdc 2.4.1 cannot receive complete Debezium binlog #2363

Closed yuzelin closed 6 months ago

yuzelin commented 1 year ago

Search before asking

Flink version

1.15.2

Flink CDC version

2.4.1

Database and its version

MySQL: 5.7.31-log MySQL Community Server (GPL)

Minimal reproduce step

  1. submit a Paimon database synchronization job with command:
./bin/flink run \
    ./lib/paimon-flink-action-0.5-SNAPSHOT.jar \
    mysql-sync-database \
    --warehouse oss://odps-prd/rtdp/paimon \
    --database flink \
    --ignore-incompatible true \
    --table-prefix ods_ \
    --mode combined \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf username=flink \
    --mysql-conf password=flink \
    --mysql-conf database-name=flink \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://localhost:9083 \
    --table-conf bucket=2 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=8 \
    --table-conf page-size=4k \
    --table-conf write-buffer-size=512mb \
    --table-conf num-sorted-run.compaction-trigger=5 \
    --table-conf num-sorted-run.stop-trigger=2147483647 \
    --table-conf sort-spill-threshold=10 \
    --table-conf write-buffer-spillable=true
  1. After the job steps into Incremental Phase, use MySQL Workbench to create a new table and insert some data:

    create table Student(Sno char(9) primary key,Sname char(20) not null,Ssex char(2),Sage smallint,Sdept char(20))
  2. An error occurs (this is in Paimon):

    java.lang.IllegalArgumentException: Invalid historyRecord, because tableChanges should contain exactly 1 item.

The wrong hisroryRecord is:

{
  "source": {
    "file": "mysql-bin.000126",
    "pos": 103181,
    "server_id": 1
  },
  "position": {
    "ts_sec": 1690955907,
    "file": "mysql-bin.000126",
    "pos": 103368,
    "server_id": 1
  },
  "databaseName": "flink",
  "ddl": "create table Student(Sno char(9) primary key,Sname char(20) not null,Ssex char(2),Sage smallint,Sdept char(20))",
  "tableChanges": []
}

What did you expect to see?

the tableChanges should contains data.

What did you see instead?

the tableChanges is empty.

Anything else?

  1. we test that use debezium to consume mysql record directly and the result is correct. We have tested two Debezium versions that are same to 2.3.0 and 2.4.1 .
  2. This job can work normally with cdc-2.3.0.

Are you willing to submit a PR?

PatrickRen commented 6 months ago

Closing this issue as it has been migrated to Apache Jira.