confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

db.timezone is not applied on the endDate #1397

Open JorgeLeonelMatos opened 4 months ago

JorgeLeonelMatos commented 4 months ago

Hi,

My JDBC Source Connector config is:

{
  "name": "vcs.transactions.ke.source_connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "quote.sql.identifiers": "never",
    "timestamp.column.name": "updated_at",
    "incrementing.column.name": "id_transaction",
    "tasks.max": "1",
    "internal.key.converter.schemas.enable": "false",
    "mode": "timestamp+incrementing",
    "topic.prefix": "vcs.transactions_approved.ke.topic",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "validate.non.null": "false",
    "query": "SELECT * FROM transaction",
    "connection.attempts": "10",
    "batch.max.rows": "100",
    "timestamp.delay.interval.ms": "0",
    "db.timezone": "Africa/Nairobi",
    "connection.backoff.ms": "100",
    "poll.interval.ms": "100",
    "value.converter.schemas.enable": "false",
    "connection.url": "jdbc:mysql://<host>:3306/<db_name>?user=<user>&password=<password>",
    "numeric.mapping": "best_fit",
    "numeric.precision.mapping": "true"
  }
}

Using:

    "db.timezone": "Africa/Nairobi",

The timezone Africa/Nairobi is UTC+3.

In theory, the connector should fetch data using the timezone but that's not what I can observe.

Example:

1 -- now() update transaction t set updated_at = now() where t.id_transaction = 1; 2 -- now() + INTERVAL 1 HOUR update transaction t set updated_at = now() + INTERVAL 1 HOUR where t.id_transaction = 2; 3 -- now() + INTERVAL 2 HOUR update transaction t set updated_at = now() + INTERVAL 2 HOUR where t.id_transaction = 3; 4 -- now() + INTERVAL 3 HOUR update transaction t set updated_at = now() + INTERVAL 3 HOUR where t.id_transaction = 4; 5 -- now() + INTERVAL 4 HOUR update transaction t set updated_at = now() + INTERVAL 4 HOUR where t.id_transaction = 5;

When creating the connector I would expect to fetch the row (4) that match the timezone, but what I get is (1). And the payload instead of having the field updated_at in unix_timestamp I get the value with a time diff 3 hours ago. The same behavior is reflected on the topic connect_offset where timestamp is also 3 hours ago.

Assuming now() is 1707477449 - 2024-02-09 11:17:29

Message on topic connect_offset:

{
    "timestamp_nanos": 0,
    "incrementing": 1,
    "timestamp": 1707466649
}

where timestamp is 1707466649 which is 2024-02-09 08:17:29 => 3 hours ago from the database record. I was expecting 3 hours later!!!.

It seems the timezone is being applied in reverse mode.

Note: the database is MySQL.

Does anyone have the same setup? Is it bad config? Should the timezone be defined on the connection url instead? I posted the question also on https://stackoverflow.com/questions/77967883/jdbc-source-kafka-connect-db-timezone-is-not-applied-on-the-enddate.