confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 954 forks source link

Query with WHERE clause cannot be used with incrementing/timestamp mode #1296

Open maximilianobl opened 1 year ago

maximilianobl commented 1 year ago

I am using Kafka connect (Confluent 7.2.1) with JDBC (Confluent jdbc 10.5). I'm using the source connector to read data from a view. The view is prepared to return the id (increment) and the last_date field (timestamp). My source database is informix. The destination is PostgreSQL although it is beside the point.

The beginning believed that it worked well since the insertions and modifications made them fast, but when the view increased its data (23 million records) the modifications were no longer instantaneous.

Doing tests I noticed that Kafka connect was actually re-reading the entire view and not just the data starting from the last id and last_date. To test, I limited the view to 100 records and indeed the changes were detected quickly, when I reconfigure it with the 23 million records it takes hours.

Reviewing a little more I see that the current values are shown like this (offset):

rowtime: 2023/01/04 15:27:16.361 Z, key: ["source_novedades",{"query":"query"}], value: {"timestamp_nanos":0,"incrementing":5001,"timestamp":1672835217000}, partition: 2 I don't quite understand what timestamp_nano means but nevertheless I see a single date (timestamp) and it is the last one processed.

On the other hand, the query in the log is shown like this: SELECT * FROM my_vw WHERE last_date < ? AND (( last_date = ? AND id > ?) OR last_date > ?) ORDER BY last_date, id ASC

Which wouldn't make much sense since I'm asking: last_date < ? and also AND (.... OR last_date > ?)

{
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:informix-sqli://ip:port/midb:informixserver=mibase",
        "connection.user":"informix", 
        "connection.password":"pass",
        "query": "SELECT * FROM my_vw",
        "topic.prefix": "novedades",
        "db.timezone": "America/Argentina/Buenos_Aires",
        "poll.interval.ms": "10000",
        "mode":"timestamp+incrementing",
        "schema.pattern": "informix",
        "timestamp.column.name": "last_date",
        "incrementing.column.name": "id",
        "validate.non.null": false,

        "numeric.mapping":"best_fit",
        "transforms": "copyFieldToKey,extractKeyFromStruct,removeKeyFromValue",
        "transforms.copyFieldToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.copyFieldToKey.fields": "id",
        "transforms.extractKeyFromStruct.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractKeyFromStruct.field": "id",
        "transforms.removeKeyFromValue.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.removeKeyFromValue.blacklist": "id",
        "key.converter" : "org.apache.kafka.connect.converters.LongConverter",
        "batch.size": 200000,
        "linger.ms": 60000,
        "compression.type":"lz4",
        "acks": 1,
        "offset.flush.timeout.ms": 300000
}
OneCricketeer commented 1 year ago

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/overview.html#specifying-a-where-clause-with-query-modes

pantaoran commented 1 year ago

I am having the same questions as @maximilianobl since I'm also observing the generated where clause which looks contradictory to me. Did you have any success in figuring out what's going on?

The link provided by @OneCricketeer only explains how to set your own where clause, but doesn't answer why the default behaviour of timestamp+incrementing seems to result in nonsensical behaviour.