confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
17 stars 955 forks source link

Add support for string incrementing columns #97

Open wkiser opened 8 years ago

wkiser commented 8 years ago

Would it be possible to add support for using a string column as the incrementing column? Looking at the code a little it seems like it might be as easy as making TimestampIncrementingTableQuerier generic.

ewencp commented 8 years ago

@wkiser That might be possible with a simple change as you mentioned, as long as we have all the type info we need from the table metadata before creating the querier.

Out of curiousity, why strings? This seems like a pretty unusual use case unless you have some other type being encoded as a string and which ensures lexicographic ordering on the strings matches the ordering of the underlying type.

wkiser commented 8 years ago

Hey @ewencp, you pretty much described the structure of the data that I am dealing with.

The tables I'm querying have a fixed width string primary key, which is a combination of numbers and letters that are lexicographically sortable. I have a temporary solution working where I create a new, encoded 'id' column at query time, but it is really inefficient. Normally I would just add an autoincrement column, but this table is very large.

dla-c-box commented 1 year ago

There is a similar issue here. For Postgresql, this post describes with great details some of the possible Postgresql race conditions, which makes the current JDBC Source connection parameters insufficient to be reliable.

The author proposed a query like the following for his messaging architecture to fetch the next batch of messages without requiring to lock the whole table: SELECT position, message_id, message_type, data FROM outbox WHERE ( (transaction_id = last_processed_transaction_id AND position > last_processed_position) OR (transaction_id > last_processed_transaction_id) ) AND transaction_id < pg_snapshot_xmin(pg_current_snapshot()) ORDER BY transaction_id ASC, position ASC LIMIT 100;

We can't specify the 'last_processed_transaction_id' in the Source connector, so a workaround would be to have a source table defined with the following fields: id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, transaction_id XID8 NOT NULL DEFAULT pg_snapshot_xmin(pg_current_snapshot()), message_id NUMERIC(39,19) NOT NULL GENERATED ALWAYS AS ( (TEXT(transaction_id) || '.' || LPAD(TEXT(id), 19, '0'))::NUMERIC(39,19) ) STORED

and then we could have the source connector configured with: "incrementing.column.name": "message_id", "query":"SELECT id, message_id, format_version, message, message_created_at, message_source FROM (SELECT * FROM public.outbox_products WHERE outbox_products.transaction_id < pg_snapshot_xmin(pg_current_snapshot())) tbl"

Unfortunately, because message_id is NUMERIC(39,19), it can't currently be used as "incrementing.column.name". We could also code message_id as an incrementing TEXT/CHAR/VARCHAR (this issue's proposition), which also can't currently be used as "incrementing.column.name".

So I see no way to configure the JDBC Source connector to be reliable w.r.t. the Postgresql limitations right now unless decimals or strings are supported as "incrementing.column.name". (or there is a new parameter to support saving and reusing the last_processed_transaction_id between calls as proposed in that website)