confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 955 forks source link

handle io.debezium.time.ZonedTimestamp schema name #1361

Open DmitriyBrashevets opened 1 year ago

DmitriyBrashevets commented 1 year ago

Problem

The record from io.debezium.connector.postgresql.PostgresConnector source connector which contains the timestamp with timezone type in Postgres has the type STRING and name io.debezium.time.ZonedTimestamp in schema. The io.debezium.time.ZonedTimestamp logical name is not anyhow supported by the JdbcSinkConnector

Record example:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          ...
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.time.ZonedTimestamp",
            "version": 1,
            "field": "from_date"
          },
          ...
        ],
        "optional": true,
        "name": "....Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          ...
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.time.ZonedTimestamp",
            "version": 1,
            "field": "from_date"
          },
          ...
        ],
        "optional": true,
        "name": "....Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      }
    ],
    "optional": false,
    "name": "..."
  },
  "payload": {
    "before": null,
    "after": {
      ...
      "from_date": "2023-09-03T21:00:00Z",
      ...
    },
    "source": {
      "version": "1.9.7.Final",
      "connector": "postgresql",
      ...
    },
    "op": "c",
    ...
  }
}

Solution

Does this solution apply anywhere else?
If yes, where?

Test Strategy

Testing done:

Release Plan

cla-assistant[bot] commented 1 year ago

CLA assistant check
All committers have signed the CLA.