pingcap / tiflow

This repo maintains DM (a data migration platform) and TiCDC (change data capture for TiDB)
Apache License 2.0
430 stars 286 forks source link

'enum' column type value is represented as integer value in the Kafka sink using 'canal-json' protocol #8491

Open andriy-buryy opened 1 year ago

andriy-buryy commented 1 year ago

What did you do?

  1. Create TiCDC changefeed:

    curl -X POST http://127.0.0.1:8300/api/v1/changefeeds \
    -H 'Content-Type: application/json' \
    -d '{
    "changefeed_id": "test-changefeed",
    "sink_uri": "kafka://kafka-0:9092/test-topic?protocol=canal-json&kafka-version=3.2.0",
    "filter_rules": ["test.test_table"]
    }'
  2. Create table test_table

    $ mysql -u root -h 127.0.0.1 -P 4000 test
CREATE TABLE test_table (
    id     int unsigned auto_increment primary key,
    name   varchar(64) not null,
    status enum ('none', 'active', 'reserved', 'sold') not null default 'none'
);
  1. Insert record

    INSERT INTO test_table (name, status) VALUES ('Product 1', 'active');
  2. Check data in the TiDB:

    SELECT * FROM test_table;
    +----+-----------+--------+
    | id | name      | status |
    +----+-----------+--------+
    |  1 | Product 1 | active |
    +----+-----------+--------+
  3. Check the INSERT event message value in the Kafka topic:

    {
    "id": 0,
    "database": "test",
    "table": "test_table",
    "pkNames": [
    "id"
    ],
    "isDdl": false,
    "type": "INSERT",
    "es": 1678367309358,
    "ts": 1678367310259,
    "sql": "",
    "sqlType": {
    "id": 4,
    "name": 12,
    "status": 4
    },
    "mysqlType": {
    "id": "int unsigned",
    "name": "varchar",
    "status": "enum"
    },
    "old": null,
    "data": [
    {
      "id": "1",
      "name": "Product 1",
      "status": "2"
    }
    ]
    }

What did you expect to see?

The status enum field value in the Kafka message should be "active" (the same as in the database).

What did you see instead?

The status enum field value in the Kafka message is "2" (not the same as in the database).

Versions of the cluster

Upstream TiDB cluster version (execute SELECT tidb_version(); in a MySQL client):

Release Version: v6.6.0
Edition: Community
Git Commit Hash: f4ca0821fb96a2bdd37d2fb97eb26c07fc58d4e4
Git Branch: heads/refs/tags/v6.6.0
UTC Build Time: 2023-02-17 14:49:02
GoVersion: go1.19.5
Race Enabled: false
TiKV Min Version: 6.2.0-alpha
Check Table Before Drop: false
Store: tikv

Upstream TiKV version (execute tikv-server --version):

TiKV 
Release Version:   6.6.0
Edition:           Community
Git Commit Hash:   58d231bccb4fb188bb697905ee4faf086c4ac931
Git Commit Branch: heads/refs/tags/v6.6.0
UTC Build Time:    2023-02-18 08:44:45
Rust Version:      rustc 1.67.0-nightly (96ddd32c4 2022-11-14)
Enable Features:   pprof-fp jemalloc mem-profiling portable sse test-engine-kv-rocksdb test-engine-raft-raft-engine cloud-aws cloud-gcp cloud-azure
Profile:           dist_release

TiCDC version (execute cdc version):

Release Version: v6.6.0
Git Commit Hash: feae75ed4c816da5df52e931cb936a97f619ae0e
Git Branch: heads/refs/tags/v6.6.0
UTC Build Time: 2023-02-15 15:38:00
Go Version: go version go1.19.5 linux/amd64
Failpoint Build: false
sdojjy commented 1 year ago

ticdc will parse enum to int value https://github.com/pingcap/tiflow/blob/2abe0212a1552c65acfaf677343f00a3155882f1/cdc/entry/mounter.go#L438-L441

3AceShowHand commented 1 year ago

For the enum type, represented as an integer string, this matches the official implementation.

andriy-buryy commented 1 year ago

Hi,

I see, thank you for the explanation.

Maybe it will be possible to provide the full enum field definition in the mysqlType field? I mean include enum options, like it is done in table schema definition: enum('none', 'active', 'reserved', 'sold'). Or use any other way to add enum options to Kafka message.

Because for now, if we need to have the enum value as a string, there is no way to map enum option number to enum option value other than caching this mapping from the table schema definition and updating the cache on every ALTER event. But this approach is still limited and can be used if events are produced to Kafka topic with 1 partition.

takaidohigasi commented 3 weeks ago

we are also glad that we can config enum is handled by int or string

wk989898 commented 2 weeks ago

Hi,

I see, thank you for the explanation.

Maybe it will be possible to provide the full enum field definition in the mysqlType field? I mean include enum options, like it is done in table schema definition: enum('none', 'active', 'reserved', 'sold'). Or use any other way to add enum options to Kafka message.

Because for now, if we need to have the enum value as a string, there is no way to map enum option number to enum option value other than caching this mapping from the table schema definition and updating the cache on every ALTER event. But this approach is still limited and can be used if events are produced to Kafka topic with 1 partition.

you can set content-compatible=true in sink-uri to enable this feature https://docs.pingcap.com/tidb/stable/ticdc-canal-json#mysqltype-field