birdiecare / connect-smts

Kafka Connect SMTs
MIT License
14 stars 1 forks source link

DebeziumJsonDeserializer seems to not work #42

Open smyrgeorge opened 1 year ago

smyrgeorge commented 1 year ago

Hola!

I'm playing around with Debezium + PostgreSQL and I'm trying to use the DebeziumJsonDeserializer.

I managed to load the plugin correctly, I saw it in the logs when Kafka connect is starting up.

Also managed to load the connector, using the following config.

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "@@@@@",
    "database.password": "@@@@@",
    "database.dbname": "@@@@@",
    "topic.prefix": "dbserver1",
    "schema.include.list": "inventory",
    "transforms": "DebeziumJsonDeserializer",
    "transforms.DebeziumJsonDeserializer.type": "com.birdie.kafka.connect.smt.DebeziumJsonDeserializer",
    "transforms.DebeziumJsonDeserializer.optional-struct-fields": true,
    "transforms.DebeziumJsonDeserializer.union-previous-messages-schema": true,
    "transforms.DebeziumJsonDeserializer.probabilistic-fast-path": true
  }
}

From the logs:

2023-08-19 08:06:46,151 INFO   ||  EnrichedConnectorConfig values:
    config.action.reload = restart
    connector.class = io.debezium.connector.postgresql.PostgresConnector
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    exactly.once.support = requested
    header.converter = null
    key.converter = null
    name = inventory-connector
    offsets.storage.topic = null
    predicates = []
    tasks.max = 1
    topic.creation.groups = []
    transaction.boundary = poll
    transaction.boundary.interval.ms = null
    transforms = [DebeziumJsonDeserializer]
    transforms.DebeziumJsonDeserializer.convert-numbers-to-double = false
    transforms.DebeziumJsonDeserializer.ignored-fields =
    transforms.DebeziumJsonDeserializer.negate = false
    transforms.DebeziumJsonDeserializer.optional-struct-fields = true
    transforms.DebeziumJsonDeserializer.predicate = null
    transforms.DebeziumJsonDeserializer.probabilistic-fast-path = true
    transforms.DebeziumJsonDeserializer.sanitize.field.names = false
    transforms.DebeziumJsonDeserializer.type = class com.birdie.kafka.connect.smt.DebeziumJsonDeserializer
    transforms.DebeziumJsonDeserializer.union-previous-messages-schema = true
    transforms.DebeziumJsonDeserializer.union-previous-messages-schema.log-union-errors = false
    value.converter = null

I have a table with the following values (one of the rows).

1002,George,Bailey,gbailey@foobar.com,"{""a"": 5, ""b"": ""test""}"

The message produced for this row is like:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "default": 0,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "name": "io.debezium.data.Json",
                        "version": 1,
                        "field": "test"
                    }
                ],
                "optional": true,
                "name": "dbserver1.inventory.customers.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "default": 0,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "name": "io.debezium.data.Json",
                        "version": 1,
                        "field": "test"
                    }
                ],
                "optional": true,
                "name": "dbserver1.inventory.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "dbserver1.inventory.customers.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "id": 1002,
            "first_name": "George",
            "last_name": "Bailey",
            "email": "gbailey@foobar.com",
            "test": "{\"a\": 5, \"b\": \"test\"}"
        },
        "source": {
            "version": "2.4.0.Alpha2",
            "connector": "postgresql",
            "name": "dbserver1",
            "ts_ms": 1692432406406,
            "snapshot": "first",
            "db": "postgres",
            "sequence": "[null,\"35544840\"]",
            "schema": "inventory",
            "table": "customers",
            "txId": 926,
            "lsn": 35544840,
            "xmin": null
        },
        "op": "r",
        "ts_ms": 1692432406496,
        "transaction": null
    }
}

As we can see the test field is still encoded as a string:

"{\"a\": 5, \"b\": \"test\"}"

Am I missing something? I'm using the latest images (2.4) provided by the debezium team. https://quay.io/organization/debezium

Thanks in advance!

RaghadAlkhudhair commented 6 months ago

I want to ask you how did you manage to use the transformer without getting the following error:

java.lang.ClassCastException: class org.apache.kafka.connect.sink.SinkRecord cannot be cast to class org.apache.kafka.connect.source.SourceRecord (org.apache.kafka.connect.sink.SinkRecord and org.apache.kafka.connect.source.So │
│ urceRecord are in unnamed module of loader 'app')                                                                                                                                                                                             │
│   at com.birdie.kafka.connect.smt.DebeziumJsonDeserializer.apply(DebeziumJsonDeserializer.java:26)                                                                                                                                            │
│   at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)                                                                                                                                                  │
│   at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54)                                                                                                                                         │
│   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)                                                                                                                     │
│   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)                                                                                                               │
│   ... 15 more

because I am getting this error currently

smyrgeorge commented 6 months ago

I don't remember actually. Although I managed to create me own converters.

If you want just take a look here:

https://github.com/smyrgeorge/debezium-test/blob/main/kafka-connect-json-to-proto/src/main/kotlin/io/smyrgeorge/connect/converter/JsonNodeConverter.kt