confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

sqlite timestamp+incrementer issue #1335

Open slecrenski opened 1 year ago

slecrenski commented 1 year ago

I have a sqlite database file which contains a column date_modified which is currently in the form of a double. This double is a unixtimestamp. I am attempting to use

mode=timestamp+incrementing
incrementing.column.name=unique_id
timestamp.column.name=date_modified
db.timezone=UTC
query="select unique_id, date_modified from (select rowid as unique_id, datetime(date_modified, 'unixepoch') as date_modified from tablename) t

This double converts pefectly for all records using the select statement above in the sqlite command interface and returns all correct values. But when I attempt to run it with kafka-connect-jdbc I get the following "error???" I find it quite surprising that the connector does not immediately crash since this would be an obvious issue for the usability of such a configuration. Effectively the connector continues to run but literally does nothing since it seems unable to identify records to publish.

[2023-05-02 19:31:01,096] DEBUG [test-connector|task-0] executing query SELECT strftime('%Y-%m-%d %H:%M:%S.%f','now') to get current time from database (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:556)
[2023-05-02 19:31:01,107] DEBUG [test-connector|task-0] Executing prepared statement with start time value = 2023-05-02 19:31:01.001 end time = 2023-05-02 19:31:01.001 and incrementing value = -1 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria:153)
[2023-05-02 19:31:01,117] DEBUG [test-connector|task-0] JDBC type 'NULL' not currently supported for column 'date_modified' (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:1037)
[2023-05-02 19:31:01,117] DEBUG [test-connector|task-0] Resetting querier TimestampIncrementingTableQuerier{table=null, query='select unique_id, date_modified from (select rowid as unique_id, datetime(date_modified, 'unixepoch') as date_modified from tablename) t', topicPrefix='test-table-prefix', incrementingColumn='unique_id', timestampColumns=[date_modified]} (io.confluent.connect.jdbc.source.JdbcSourceTask:493)

I am using the latest version of the kafka-connect-jdbc (10.7.1) which includes sqlite-jdbc-3.25.2.jar. I have also tried with the latest sqlite-jdbc and receive a different error which talks about double (0)

Is this a supported use case for this tool? I strongly suspect that the sqlite was never tested with the timestamp or the timestamp+incrementer feature.

Anyways, appreciate any help you can provide or for someone to confirm sqlite is not currently supported/tested for timestamp or timestamp+incrementer.

slecrenski commented 1 year ago

🦗🦗🦗

slecrenski commented 1 year ago

Is this repository still maintained?