StarRocks / starrocks-connector-for-kafka

Apache License 2.0
7 stars 12 forks source link

【惊天巨bug】AddOpFieldForDebeziumRecord无法正确添加__op #25

Open bulolo opened 5 months ago

bulolo commented 5 months ago

完整复现过程:https://junyao.tech/posts/e4464a42.html

都无法通过AddOpFieldForDebeziumRecord 添加op:1 或者 op:0

mysql

        "transforms": "addfield,unwrap",
        "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "rewrite"

mongo

        "transforms": "addfield,unwrap",
        "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "rewrite"
bulolo commented 5 months ago
image

实际加不上__op

Desperado2 commented 1 week ago

这是因为只实现了按照MySQL的binlog的实现,也就是说是否会添加__op取决于binlog数据中是否有op这个字段,按照debezium MySQL的日志格式来看,其中是有op这样的字段标识增删改的。 debezium文档中提到的格式

{
  "schema": { ... },
  "payload": {
    "before": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { 
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 
      "version": "3.0.1.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "ts_ms": 1465581029100,
      "ts_us": 1465581029100000,
      "ts_ns": 1465581029100000000,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 
    "ts_ms": 1465581029523, 
    "ts_us": 1465581029523758, 
    "ts_ns": 1465581029523758914 
  }
}

具体可见:https://debezium.io/documentation/reference/stable/connectors/mysql.html

具体实现代码可参见:

    private static final String OP = "op";
    private static final String OP_C = "c";
    private static final String OP_U = "u";
    private static final String OP_D = "d";

public R apply(R record) {
        try {
            String op;
            try {
                op = (String) value.get(OP);
            } catch (Exception e) {
                return record;
            }
            if (op.equals(OP_C) || op.equals(OP_U)) {
                Struct newValue = updateValue(value, AFTER);
                return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
            } else if (op.equals(OP_D)) {
                Struct newValue = updateValue(value, BEFORE);
                return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
            }
        } catch (Exception e) {
            return record;
        }
        return record;
    }

由此可知,其是通过是否包含 op 字段来进行是否添加的条件的。

解决方式: 您可以通过在Connect的配置中添加columns和jsonpaths的方式解决。可参考:https://docs.starrocks.io/zh/docs/loading/Load_to_Primary_Key_tables/#upsert-%E5%92%8C-delete

{
    "name": "topic-connect",
    "config": {
        "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "sink.properties.columns": "_id,name,__deleted,__op = __deleted",
        "sink.properties.jsonpaths": "[\"$._id\", \"$.name\", \"$.__deleted\"]",
        "transforms": "decrypt",
        "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation",
        "transforms.decrypt.secret.key": "6253c3d7db9723f1d43b5e447c7ddf0da9e68dc121829daa0a06438b5deba549"
    }
}
bulolo commented 1 week ago

@Desperado2 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html

要看一下这个SMT将标准的数据解开,StarRocksSinkConnector,应该要支持SMT模式,

另外上面的解决方法意味着columns和jsonpaths,如果有100个字段,要添加100次?

bulolo commented 1 week ago

这是因为只实现了按照MySQL的binlog的实现,也就是说是否会添加__op取决于binlog数据中是否有op这个字段,按照debezium MySQL的日志格式来看,其中是有op这样的字段标识增删改的。 debezium文档中提到的格式

{
  "schema": { ... },
  "payload": {
    "before": { 
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { 
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { 
      "version": "3.0.1.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "ts_ms": 1465581029100,
      "ts_us": 1465581029100000,
      "ts_ns": 1465581029100000000,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", 
    "ts_ms": 1465581029523, 
    "ts_us": 1465581029523758, 
    "ts_ns": 1465581029523758914 
  }
}

具体可见:https://debezium.io/documentation/reference/stable/connectors/mysql.html

具体实现代码可参见:

    private static final String OP = "op";
    private static final String OP_C = "c";
    private static final String OP_U = "u";
    private static final String OP_D = "d";

public R apply(R record) {
        try {
            String op;
            try {
                op = (String) value.get(OP);
            } catch (Exception e) {
                return record;
            }
            if (op.equals(OP_C) || op.equals(OP_U)) {
                Struct newValue = updateValue(value, AFTER);
                return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
            } else if (op.equals(OP_D)) {
                Struct newValue = updateValue(value, BEFORE);
                return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
            }
        } catch (Exception e) {
            return record;
        }
        return record;
    }

由此可知,其是通过是否包含 op 字段来进行是否添加的条件的。

解决方式: 您可以通过在Connect的配置中添加columns和jsonpaths的方式解决。可参考:https://docs.starrocks.io/zh/docs/loading/Load_to_Primary_Key_tables/#upsert-%E5%92%8C-delete

{
    "name": "topic-connect",
    "config": {
        "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "sink.properties.columns": "_id,name,__deleted,__op = __deleted",
        "sink.properties.jsonpaths": "[\"$._id\", \"$.name\", \"$.__deleted\"]",
        "transforms": "decrypt",
        "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation",
        "transforms.decrypt.secret.key": "6253c3d7db9723f1d43b5e447c7ddf0da9e68dc121829daa0a06438b5deba549"
    }
}

另外当初你们有复现过,标准的mysql binlog也是不行的(删除操作)