apache / paimon

Apache Paimon is a lake format that enables building a Realtime Lakehouse Architecture with Flink and Spark for both streaming and batch operations.
https://paimon.apache.org/
Apache License 2.0
2.13k stars 841 forks source link

[cdc] support debezium-json format with schema for kafka sync action #2701

Open zhangjun0x01 opened 5 months ago

zhangjun0x01 commented 5 months ago

Search before asking

Motivation

support debezium-json format with schema for kafka sync action.

we can get the schema from the json format ,the format like this :

{
  "schema": {...},
  "payload": {
    "before": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.18
    },
    "after": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.15
    },
    "source": {...},
    "op": "u",
    "ts_ms": 1589362330904,
    "transaction": null
  }
}

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

sunxiaojian commented 5 months ago

@zhangjun0x01 I remember this feature is already supported. Is there any problem now?

heine0310 commented 5 months ago

@zhangjun0x01 I remember this feature is already supported. Is there any problem now? 我记得这个功能已经支持了。现在有什么问题吗?

The json given by flink-cdc does not contain the field pkNames, but the debezuim-json parsed in the source code needs to contain pkNames, otherwise when synchronizing multiple tables, it will report that the primary key does not have this exception.When it parses debezium-json, the required fields are as follows: private static final String FIELD_SCHEMA = "schema"; private static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; private static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; private static final String OP_UPDATE = "u"; private static final String OP_DELETE = "d"; private static final String OP_READE = "r";

sunxiaojian commented 5 months ago

@zhangjun0x01 I remember this feature is already supported. Is there any problem now? 我记得这个功能已经支持了。现在有什么问题吗?

The json given by flink-cdc does not contain the field pkNames, but the debezuim-json parsed in the source code needs to contain pkNames, otherwise when synchronizing multiple tables, it will report that the primary key does not have this exception.When it parses debezium-json, the required fields are as follows: private static final String FIELD_SCHEMA = "schema"; private static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; private static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; private static final String OP_UPDATE = "u"; private static final String OP_DELETE = "d"; private static final String OP_READE = "r";

At present, the synchronization mode should only support specifying IDs for a single table, as the debezium data ID is stored in the key, so multi table recognition requires special compatibility

luckyLJY commented 5 months ago

这个需要特殊设置吗?我实际使用kafka-connetor发送到kafka的数据,依然存在主键不存在的问题

luckyLJY commented 5 months ago

整库以及单表支持表结构的修改吗?

sunxiaojian commented 5 months ago

@zhangjun0x01 I remember this feature is already supported. Is there any problem now? 我记得这个功能已经支持了。现在有什么问题吗?

The json given by flink-cdc does not contain the field pkNames, but the debezuim-json parsed in the source code needs to contain pkNames, otherwise when synchronizing multiple tables, it will report that the primary key does not have this exception.When it parses debezium-json, the required fields are as follows: private static final String FIELD_SCHEMA = "schema"; private static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; private static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; private static final String OP_UPDATE = "u"; private static final String OP_DELETE = "d"; private static final String OP_READE = "r";

hi @zhangjun0x01 In the case of synchronizing multiple tables: multiple tables with different structures be stored in one topic, or multiple topics?

sunxiaojian commented 5 months ago

整库以及单表支持表结构的修改吗?

@luckyLJY supported, reference : https://github.com/apache/incubator-paimon/blob/master/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java#L58

luckyLJY commented 5 months ago

报错缺少主键。在整库cdc时。

---Original--- From: "Xiaojian @.> Date: Mon, Jan 29, 2024 20:18 PM To: @.>; Cc: @.**@.>; Subject: Re: [apache/incubator-paimon] [cdc] support debezium-json formatwith schema for kafka sync action (Issue #2701)

@zhangjun0x01 I remember this feature is already supported. Is there any problem now? 我记得这个功能已经支持了。现在有什么问题吗?

The json given by flink-cdc does not contain the field pkNames, but the debezuim-json parsed in the source code needs to contain pkNames, otherwise when synchronizing multiple tables, it will report that the primary key does not have this exception.When it parses debezium-json, the required fields are as follows: private static final String FIELD_SCHEMA = "schema"; private static final String FIELD_PAYLOAD = "payload"; private static final String FIELD_BEFORE = "before"; private static final String FIELD_AFTER = "after"; private static final String FIELD_SOURCE = "source"; private static final String FIELD_PRIMARY = "pkNames"; private static final String FIELD_DB = "db"; private static final String FIELD_TYPE = "op"; private static final String OP_INSERT = "c"; private static final String OP_UPDATE = "u"; private static final String OP_DELETE = "d"; private static final String OP_READE = "r";

hi @zhangjun0x01 In the case of synchronizing multiple tables: multiple tables with different structures be stored in one topic, or multiple topics?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>

medivh511 commented 1 month ago

整库同步依然寻找不到主键,用kafka connect 生成的schema里默认是没有的任何主键标注的,0.8没有解决这个问题,唯一的解决办法目前是手动创建主键表,再做模式识别

medivh511 commented 1 month ago

@zhangjun0x01 I remember this feature is already supported. Is there any problem now?

在你的commit描述里: When Debezium's data is written into Kafka, the primary key will be automatically stored in the key. When Paimon parses Kafka messages, the data in the key will be attached to the ’pkNames‘ field in the value . There are some demos in unit testing

如果是以Debezium的key为主键(value假定是你的debezium-data-1.txt的模式),那key的样式是什么? 我的oracle cdc 到 kafka的 connect是这样的 { "name": "test03", "config": { "connector.class": "io.debezium.connector.oracle.OracleConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "topic.prefix": "test", "database.hostname": "...", "database.port": "1521", "database.user": "...", "database.password": "...", "database.dbname": "L2DB", "table.include.list": "FLINKUSER.ACT_DL", "schema.include.list": "FLINKUSER", "schema.history.internal.kafka.topic": "schema-changes.l2db", "snapshot.mode": "initial", "log.mining.strategy": "online_catalog", "database.history.store.only.captured.tables.ddl": "true", "database.tablename.case.insensitive": "false", "log.mining.continuous.mine": "true", "decimal.handling.mode": "string", "schema.history.internal.kafka.bootstrap.servers": "172.15.89.142:9092,172.15.89.181:9092,172.15.89.182:9092", "value.converter.schemas.enable": "true" } }

其中 "key.converter": "org.apache.kafka.connect.storage.StringConverter" 请问这种格式是否能被解析?还是说有标准的格式?

medivh511 commented 1 month ago

[Feature] Kafka debezium json supports automatic discovery of primary keys #2815 没有合并到master branch里, 0.8是肯定解决不了了

medivh511 commented 1 week ago

still not committed into the master branch or 0.8.1, primary key not found issue still exist