StarRocks / starrocks-connector-for-kafka

Apache License 2.0
7 stars 12 forks source link

debezium instructions do not work. should use debezium recommend connector defaults #17

Open alberttwong opened 7 months ago

alberttwong commented 7 months ago

from https://docs.starrocks.io/docs/loading/Kafka-connector-starrocks/. gist for commands and get testing enviornment. https://gist.github.com/alberttwong/a6d180c4eafecf9bdcf764196ca3d961

No row inserted or updated

If you create a connector with

{
    "name": "tpcds-customer-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgresql",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "tpcds",
        "topic.prefix": "tpcds",
        "table.include.list": "public.customer",
        "transforms": "addfield,unwrap",
        "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "drop"
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": true,
                "field": "col_001"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_002"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_003"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_004"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_005"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_006"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_007"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_008"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_009"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_010"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_011"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_012"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_013"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_014"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_015"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_016"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_017"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_018"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "_sling_loaded_at"
            },
            {
                "type": "int32",
                "optional": false,
                "field": "__op"
            }
        ],
        "optional": false,
        "name": "tpcds.public.customer.Value"
    },
    "payload": {
        "col_001": 6,
        "col_002": "AAAAAAAAGAAAAAAA",
        "col_003": 213219,
        "col_004": 6374,
        "col_005": 27082,
        "col_006": 2451883,
        "col_007": 2451853,
        "col_008": "Ms.",
        "col_009": "Brunilda aaa bbb ccc",
        "col_010": "Sharp",
        "col_011": "Y",
        "col_012": 4,
        "col_013": 12,
        "col_014": 1925,
        "col_015": "SURINAME",
        "col_016": null,
        "col_017": "Brunilda.Sharp@T3pylZEUQjm.org",
        "col_018": 2452430,
        "_sling_loaded_at": 1713464143,
        "__op": 0
    }
}
alberttwong commented 7 months ago

No insert into database

{
    "name": "tpcds-customer-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgresql",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "tpcds",
        "topic.prefix": "tpcds",
        "table.include.list": "public.customer",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "transforms": "addfield,unwrap",
        "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "drop"
    }
}

gives you

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": true,
                "field": "col_001"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_002"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_003"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_004"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_005"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_006"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_007"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_008"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_009"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_010"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_011"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_012"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_013"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_014"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_015"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_016"
            },
            {
                "type": "string",
                "optional": true,
                "field": "col_017"
            },
            {
                "type": "int32",
                "optional": true,
                "field": "col_018"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "_sling_loaded_at"
            },
            {
                "type": "int32",
                "optional": false,
                "field": "__op"
            }
        ],
        "optional": false,
        "name": "tpcds.public.customer.Value"
    },
    "payload": {
        "col_001": 6,
        "col_002": "AAAAAAAAGAAAAAAA",
        "col_003": 213219,
        "col_004": 6374,
        "col_005": 27082,
        "col_006": 2451883,
        "col_007": 2451853,
        "col_008": "Ms.",
        "col_009": "Brunilda aaa",
        "col_010": "Sharp",
        "col_011": "Y",
        "col_012": 4,
        "col_013": 12,
        "col_014": 1925,
        "col_015": "SURINAME",
        "col_016": null,
        "col_017": "Brunilda.Sharp@T3pylZEUQjm.org",
        "col_018": 2452430,
        "_sling_loaded_at": 1713464143,
        "__op": 0
    }
}
alberttwong commented 7 months ago

there is an insert but you don't have the before and after.

{
    "name": "tpcds-customer-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgresql",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "tpcds",
        "topic.prefix": "tpcds",
        "table.include.list": "public.customer",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "transforms": "addfield,unwrap",
        "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "transforms.unwrap.delete.handling.mode": "drop"
    }
}
{
    "col_001": 6,
    "col_002": "AAAAAAAAGAAAAAAA",
    "col_003": 213219,
    "col_004": 6374,
    "col_005": 27082,
    "col_006": 2451883,
    "col_007": 2451853,
    "col_008": "Ms.",
    "col_009": "Brunilda aaa bbb",
    "col_010": "Sharp",
    "col_011": "Y",
    "col_012": 4,
    "col_013": 12,
    "col_014": 1925,
    "col_015": "SURINAME",
    "col_016": null,
    "col_017": "Brunilda.Sharp@T3pylZEUQjm.org",
    "col_018": 2452430,
    "_sling_loaded_at": 1713464143,
    "__op": 0
}
mysql> select * from customer where col_001 = 6;
+-----------------------------+---------+------------------+---------+---------+---------+---------+---------+---------+------------------+---------+---------+---------+---------+---------+----------+---------+--------------------------------+---------+------------------+
| _sling_row_id               | col_001 | col_002          | col_003 | col_004 | col_005 | col_006 | col_007 | col_008 | col_009          | col_010 | col_011 | col_012 | col_013 | col_014 | col_015  | col_016 | col_017                        | col_018 | _sling_loaded_at |
+-----------------------------+---------+------------------+---------+---------+---------+---------+---------+---------+------------------+---------+---------+---------+---------+---------+----------+---------+--------------------------------+---------+------------------+
| 2fHjhd0MkGgtR7jpw4tfuB5Jw2e |       6 | AAAAAAAAGAAAAAAA |  213219 |    6374 |   27082 | 2451883 | 2451853 | Ms.     | Brunilda         | Sharp   | Y       |       4 |      12 |    1925 | SURINAME |         | Brunilda.Sharp@T3pylZEUQjm.org | 2452430 |       1713464143 |
| NULL                        |       6 | AAAAAAAAGAAAAAAA |  213219 |    6374 |   27082 | 2451883 | 2451853 | Ms.     | Brunilda aaa bbb | Sharp   | Y       |       4 |      12 |    1925 | SURINAME | NULL    | Brunilda.Sharp@T3pylZEUQjm.org | 2452430 |       1713464143 |
+-----------------------------+---------+------------------+---------+---------+---------+---------+---------+---------+------------------+---------+---------+---------+---------+---------+----------+---------+--------------------------------+---------+------------------+
2 rows in set (0.01 sec)
alberttwong commented 7 months ago

This should be the correct method but there is no data inserted or updated

{
    "name": "tpcds-customer-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgresql",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "tpcds",
        "topic.prefix": "tpcds",
        "table.include.list": "public.customer"
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_001"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_002"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_003"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_004"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_005"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_006"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_007"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_008"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_009"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_010"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_011"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_012"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_013"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_014"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_015"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_016"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_017"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_018"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "_sling_loaded_at"
                    }
                ],
                "optional": true,
                "name": "tpcds.public.customer.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_001"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_002"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_003"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_004"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_005"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_006"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_007"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_008"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_009"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_010"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_011"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_012"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_013"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_014"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_015"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_016"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "col_017"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "col_018"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "_sling_loaded_at"
                    }
                ],
                "optional": true,
                "name": "tpcds.public.customer.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "tpcds.public.customer.Envelope",
        "version": 1
    },
    "payload": {
        "before": {
            "col_001": 6,
            "col_002": "AAAAAAAAGAAAAAAA",
            "col_003": 213219,
            "col_004": 6374,
            "col_005": 27082,
            "col_006": 2451883,
            "col_007": 2451853,
            "col_008": "Ms.",
            "col_009": "Brunilda aaa bbb ccc ddd",
            "col_010": "Sharp",
            "col_011": "Y",
            "col_012": 4,
            "col_013": 12,
            "col_014": 1925,
            "col_015": "SURINAME",
            "col_016": null,
            "col_017": "Brunilda.Sharp@T3pylZEUQjm.org",
            "col_018": 2452430,
            "_sling_loaded_at": 1713464143
        },
        "after": {
            "col_001": 6,
            "col_002": "AAAAAAAAGAAAAAAA",
            "col_003": 213219,
            "col_004": 6374,
            "col_005": 27082,
            "col_006": 2451883,
            "col_007": 2451853,
            "col_008": "Ms.",
            "col_009": "Brunilda aaa bbb ccc ddd eee",
            "col_010": "Sharp",
            "col_011": "Y",
            "col_012": 4,
            "col_013": 12,
            "col_014": 1925,
            "col_015": "SURINAME",
            "col_016": null,
            "col_017": "Brunilda.Sharp@T3pylZEUQjm.org",
            "col_018": 2452430,
            "_sling_loaded_at": 1713464143
        },
        "source": {
            "version": "2.5.4.Final",
            "connector": "postgresql",
            "name": "tpcds",
            "ts_ms": 1713471958528,
            "snapshot": "false",
            "db": "tpcds",
            "sequence": "[\"108523976\",\"108524328\"]",
            "schema": "public",
            "table": "customer",
            "txId": 777,
            "lsn": 108524328,
            "xmin": null
        },
        "op": "u",
        "ts_ms": 1713471958951,
        "transaction": null
    }
}
alberttwong commented 7 months ago

gist for commands and get testing enviornment. https://gist.github.com/alberttwong/a6d180c4eafecf9bdcf764196ca3d961

yingtingdong commented 7 months ago

You have created a PostgreSQL source connector, which can only read data from PostgreSQL and cannot write data into StarRocks. You need to create a StarRocks sink connector to consume the data produced by the source connector from the Kafka topic, in order to finally write it into StarRocks. Please refer to the official Kafka Connect documentation for the configuration of the source connector.

alberttwong commented 7 months ago

@yingtingdong youll see in my gist that I have 2 connectors. One is a source and another is a sink. I'm showing the source since the sink is unchanged between deployments.

alberttwong commented 7 months ago

https://github.com/StarRocks/starrocks-connector-for-kafka/issues/20