Open ussama-rafique opened 3 years ago
As I understand, the timestamp.delay.interval.ms
property already is present in the config. But this property is not used in the incrementing
mode. It is used only in timestamp
and timestamp+incrementing
modes. So maybe it's better not to add a new mode, but just use this property in the incrementing
mode?
@ussama-rafique - have you found any solution to this issue? Did you try IDENTITY column if you are on Oracle 12c?
@jinays05 we have not found any clean solution for this. We are using postgres, not oracle, but we are using auto-increment columns. However, the problem will occur on Oracle too.
One work-around we have been using, is usage of views. In the view definition, we can add delay/lag and source connector will read from view, instead of the source table:
create view vw_customers as select * from customers where created_at < (current_timestamp - interval '1 minute');
@jinays05 we have not found any clean solution for this. We are using postgres, not oracle, but we are using auto-increment columns. However, the problem will occur on Oracle too. One work-around we have been using, is usage of views. In the view definition, we can add delay/lag and source connector will read from view, instead of the source table:
create view vw_customers as select * from customers where created_at < (current_timestamp - interval '1 minute');
Thanks for your response and insight on using view as alternate approach.
I struggled with this too, and I believe your proposed solution will not be sufficient, as there can always be rows that have a timestamp much earlier that commit later. If you can guarantee that all INSERT or UPDATE queries' transactions commit within 1 minute after writing, you might try the "1 minute" approach above, but that is a nasty restriction. We have several jobs that take many minutes. If you make the window bigger and bigger, now you are getting redundant messages very often. Maybe that's okay? But sure would be confusing and inefficient.
We had to go to strictly INCREMENTING connectors, as no solution with timestamps was robust enough. Even that has caveats, as you'll miss rows due to out-of-order commits. In the end, I think the JDBC source connector is not robust enough for much use at all. We need Debezium's connectors or the Confluent CDC connectors.
I struggled with this too, and I believe your proposed solution will not be sufficient, as there can always be rows that have a timestamp much earlier that commit later. If you can guarantee that all INSERT or UPDATE queries' transactions commit within 1 minute after writing, you might try the "1 minute" approach above, but that is a nasty restriction. We have several jobs that take many minutes. If you make the window bigger and bigger, now you are getting redundant messages very often. Maybe that's okay? But sure would be confusing and inefficient.
We had to go to strictly INCREMENTING connectors, as no solution with timestamps was robust enough. Even that has caveats, as you'll miss rows due to out-of-order commits. In the end, I think the JDBC source connector is not robust enough for much use at all. We need Debezium's connectors or the Confluent CDC connectors.
We came up with a different idea here. We are ensuring that all parallel processes are loading a PRE STAGE table, the order does not matter for this one. However, from PRE STAGE Table to the FINAL STAGE table we do single-threaded periodic inserts and use an ordered sequence while loading. This ensures an incremental column in the FINAL TABLE, which in turn is used in the connector. This is sort of a workaround.
However, it would be interesting to know what rows are read by the connector and published to KAFKA as connectors don't provide any feedback to the database layer on what has already been read.
We always lose some data when using "incrementing" mode on large table!
PROBLEM DESCRIPTION If there are a lot of parallel insert operations going on in the source table (which is usual), then insert transactions can/will commit out of order. Meaning, 'bigger" auto_increment ID (let's say 101) will be committed earlier and "smaller" ID (e.g. 100) committed later. The time difference here can a few seconds/milliseconds only, but the commits are out of order nevertheless. Using "incrementing" mode to load data from such tables will always result in some data loss. It happens because when the source connector worker reads/polls the table, it will get the row with greater offset value (and smaller offset row is still not committed). And in the next iteration, although that missing row is committed, but the offset is already moved past that value, so that row will be skipped. Using "timestamp+incrementing" mode does not make sense because the such tables are very large (around 5-8 million rows added daily) and the cost of any index (except PK) is too high.
PROPOSED SOLUTION I think we can resolve this issue with adding a new mode 'incrementing+delay_timestamp", where offsets will be saved for auto_increment column only, and an additional timestamp column will be used to introduce a LAG in the polling query.
Properties required for this mode (with example values) are:
This should generate following SQL: SELECT * from table_name where ID > $incrementing_offset_val AND created_at < ${current_timestamp - 'interval 60 second'} ORDER BY ID