confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 954 forks source link

Sink Connector auto create tables with proper data type #1300

Open BhuviTheDataGuy opened 1 year ago

BhuviTheDataGuy commented 1 year ago

I have the debezium source connector for Postgresql with the value convertor as Avro and it uses the schema registry.

Source DDL:

                                                                 Table "public.tbl1"
 Column |            Type             | Collation | Nullable |             Default              | Storage  | Compression | Stats target | Description
--------+-----------------------------+-----------+----------+----------------------------------+----------+-------------+--------------+-------------
 id     | integer                     |           | not null | nextval('tbl1_id_seq'::regclass) | plain    |             |              |
 name   | character varying(100)      |           |          |                                  | extended |             |              |
 col4   | numeric                     |           |          |                                  | main     |             |              |
 col5   | bigint                      |           |          |                                  | plain    |             |              |
 col6   | timestamp without time zone |           |          |                                  | plain    |             |              |
 col7   | timestamp with time zone    |           |          |                                  | plain    |             |              |
 col8   | boolean                     |           |          |                                  | plain    |             |              |
Indexes:
    "tbl1_pkey" PRIMARY KEY, btree (id)
Publications:
    "dbz_publication"
Access method: heap

In the schema registry:

{
  "type": "record",
  "name": "Value",
  "namespace": "test.public.tbl1",
  "fields": [
    {
      "name": "id",
      "type": {
        "type": "int",
        "connect.parameters": {
          "__debezium.source.column.type": "SERIAL",
          "__debezium.source.column.length": "10",
          "__debezium.source.column.scale": "0"
        },
        "connect.default": 0
      },
      "default": 0
    },
    {
      "name": "name",
      "type": [
        "null",
        {
          "type": "string",
          "connect.parameters": {
            "__debezium.source.column.type": "VARCHAR",
            "__debezium.source.column.length": "100",
            "__debezium.source.column.scale": "0"
          }
        }
      ],
      "default": null
    },
    {
      "name": "col4",
      "type": [
        "null",
        {
          "type": "double",
          "connect.parameters": {
            "__debezium.source.column.type": "NUMERIC",
            "__debezium.source.column.length": "0"
          }
        }
      ],
      "default": null
    },
    {
      "name": "col5",
      "type": [
        "null",
        {
          "type": "long",
          "connect.parameters": {
            "__debezium.source.column.type": "INT8",
            "__debezium.source.column.length": "19",
            "__debezium.source.column.scale": "0"
          }
        }
      ],
      "default": null
    },
    {
      "name": "col6",
      "type": [
        "null",
        {
          "type": "long",
          "connect.version": 1,
          "connect.parameters": {
            "__debezium.source.column.type": "TIMESTAMP",
            "__debezium.source.column.length": "29",
            "__debezium.source.column.scale": "6"
          },
          "connect.name": "io.debezium.time.MicroTimestamp"
        }
      ],
      "default": null
    },
    {
      "name": "col7",
      "type": [
        "null",
        {
          "type": "string",
          "connect.version": 1,
          "connect.parameters": {
            "__debezium.source.column.type": "TIMESTAMPTZ",
            "__debezium.source.column.length": "35",
            "__debezium.source.column.scale": "6"
          },
          "connect.name": "io.debezium.time.ZonedTimestamp"
        }
      ],
      "default": null
    },
    {
      "name": "col8",
      "type": [
        "null",
        {
          "type": "boolean",
          "connect.parameters": {
            "__debezium.source.column.type": "BOOL",
            "__debezium.source.column.length": "1",
            "__debezium.source.column.scale": "0"
          }
        }
      ],
      "default": null
    }
  ],
  "connect.name": "test.public.tbl1.Value"
}

But in the target PostgreSQL the data types are completely mismatched for ID columns and timestamp columns. Sometimes Decimal columns as well(that's due to this)

Target:

                                             Table "public.tbl1"
 Column |       Type       | Collation | Nullable | Default | Storage  | Compression | Stats target | Description
--------+------------------+-----------+----------+---------+----------+-------------+--------------+-------------
 id     | text             |           | not null |         | extended |             |              |
 name   | text             |           |          |         | extended |             |              |
 col4   | double precision |           |          |         | plain    |             |              |
 col5   | bigint           |           |          |         | plain    |             |              |
 col6   | bigint           |           |          |         | plain    |             |              |
 col7   | text             |           |          |         | extended |             |              |
 col8   | boolean          |           |          |         | plain    |             |              |
Indexes:
    "tbl1_pkey" PRIMARY KEY, btree (id)
Access method: heap

Im trying to understand even with schema registry , its not creating the target tables with proper datatypes.

Sink config:

{
        "name": "t1-sink",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
                "tasks.max": "1",
                "topics": "test.public.tbl1",
                "connection.url": "jdbc:postgresql://172.31.85.***:5432/test",
                "connection.user": "postgres",
                "connection.password": "***",
                "dialect.name": "PostgreSqlDatabaseDialect",
                "auto.create": "true",
                "insert.mode": "upsert",
                "delete.enabled": "true",
                "pk.fields": "id",
                "pk.mode": "record_key",
                "table.name.format": "tbl1",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "key.converter.schemas.enable": "false",
                "internal.key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
                "internal.key.converter.schemas.enable": "true",
                "internal.value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
                "internal.value.converter.schemas.enable": "true",
                "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
                "value.converter.schemas.enable": "true",
                "value.converter.region": "us-east-1",
                "key.converter.region": "us-east-1",
                "key.converter.schemaAutoRegistrationEnabled": "true",
                "value.converter.schemaAutoRegistrationEnabled": "true",
                "key.converter.avroRecordType": "GENERIC_RECORD",
                "value.converter.avroRecordType": "GENERIC_RECORD",
                "key.converter.registry.name": "bhuvi-debezium",
                "value.converter.registry.name": "bhuvi-debezium",
                "value.converter.column.propagate.source.type": ".*",
                "value.converter.datatype.propagate.source.type": ".*"

        }
}