confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
19 stars 956 forks source link

Source connector delivery guarantees #461

Open mikoiam opened 6 years ago

mikoiam commented 6 years ago

I'm wondering what delivery guarantees does jdbc source connector offer. We're considering a scenario where list of rows (representing business events) are appended to a DB table using incrementing ids and then jdbc source connector picks them up in incremental mode and writes to Kafka topic.

Can I assume that:

  1. Rows end up in a given Kafka topic-partition in the same order as they appear in the table? What I'm talking here are issues related with max.in.flight.requests.per.connection > 1. Do I need to set it manually to 1, or is the problem handled already?
  2. Rows end up in Kafka exactly once? I know that Kafka has a concept of cross topic transactions (writing rows to a topic + writing last consumed row id to another topic). Does jdbc source connector utilize this feature already (or can I configure it somehow) or should I expect at least once delivery of rows?
rhauch commented 6 years ago

To answer your first question: The connector writes the rows into Kafka in the order it reads them from the query result set. For incrementing mode the query includes ORDER BY <incColumn> ASC, so they should be in event order if your event IDs are incrementing. (For timestamp mode it includes ORDER BY COALESCE(<timestampColumns>) ASC; for timestamp+incrementing it includes ORDER BY COALESCE(<timestampColumns>) ASC, <incColumn> ASC.)

To try an minimize reorders, Connect by default sets max.in.flight.requests.per.connection=1, though you can override this in your worker config with producer.max.in.flight.requests.per.connection. However, Connect doesn't currently guarantee exactly once (see below), so even with this you might get duplicates of a series of events -- tho that series should still be in the same / correct order.

For your second question, no Connect does not currently support or use the exactly once semantics (EOS) behavior that is in Kafka. One big reason is that the offsets are periodically committed asynchronously, out of band of the records. See KAFKA-6080 for details.

sanjeev0915 commented 5 years ago

Hi Guys Wanted to check if this thread is being looked currently by Someone? Is the issue of Guaranteed delivery and exactly once (EOS) by Kafka Connect (JDBC Connector) still exist or Resolved ? Looking at Confluent Documentation @ https://docs.confluent.io/current/connect/design.html# - it says "As connectors run, Kafka Connect tracks offsets for each one so that connectors can resume from their previous position in the event of failures or graceful restarts for maintenance." Does this mean Kafka Connect Supports guaranteed delivery, Exactly once feature ?

Really grateful if someone can answer this as we have to a make conscious decision - whether we should go for in-built Kafka Connector from Confluent or should be design a bespoke one ?

Kind regards Sannjeev

sanjeev0915 commented 5 years ago

Hi Randall - Do you have an idea or any update on ticket - KAFKA-6080. I can not see any activity since jun 2018.

nik09541 commented 5 years ago

Hi @rhauch

I have tried to configure Kafka JDBC connect in order to have streams from an existing table. The existing table unfortunately have a id field that has purging from another event(i.e the rows are not strictly incrementing). As of now we have created script to move the records to another custom table we created with a custom id and timestamp field. But this is not working.During the first insert into custom table with data from existing table there is no event seen on kafka topic. But on updating the timestamp column the event is noticed on connectror. I did try the bulk option, but then the processing goes in an endless loop while trying to read it from avro consumer (but the bulk table option is not going to take-off either as the audit tables are going to be huge).

insert into reference (select * from customer where customer_ref_id in (5004145,5004146,5004161,5004162,5004178)); Above insert registers no event in kafka connector as id id not strictly incrementing in customer table.

insert into reference (customer_ref_id ,updated_date) values (id_seq.nextval,current_timestamp); This insert works as it is inserting data with a sequence into table. Event is noticed on kafka connector

I have tried the query mode also it also doesnt work.

Please help if we can poll out the existing data on kafka topic for a non incrementing exisiting id of a table which is having purging .