confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

JDBC SOURCE connector adding extra " and \s to json columns #1411

Open vinay-bh007 opened 2 months ago

vinay-bh007 commented 2 months ago

I am using kafka JDBC Source connector to fetch the JSON data from table (using a complex SQL query that generates the JSON data using JSON functions of Postgres DB) and using the JsonConverter to publish to kafka topic.

Since the result would have nested JSON, the data when published to kafka topic is having extra " and \ added by converter. Basically the JSON field is treated as String when converted into JSON. Is there any support to handle nested JSON in connector?

Expected output:

{"key1":"value1", "NestedKey": [{"nKey1":"nValue1"},{"nKey2":"nValue2"}] }

Output produced by connector:

{"key1":"value1", "NestedKey": "[{\"nKey1\":\"nValue1\"},{\"nKey2\":\"nValue2\"}]" }

Below is the connector properties:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

Below is the connector properties:

    "name" : "source-connector",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "topic.prefix" : "<Name>",
    "connection.url": "jdbc:postgresql://localhost:5432/<DbName>",
    "connection.user" : "<User>",
    "connection.password": "<PW>",
    "poll.interval.ms" : 3600000,
    "query": "<Custom Query having json datatype result>",
    "mode": "bulk",
    "transforms":"createKey, RenameField",
    "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields":"tpnb",
    "transforms.RenameField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.RenameField.renames":"<Few renames>",
    "db.timezone": "UTC",
    "batch.max.rows": "1000",
    "tasks.max" : "1"