hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Exactly once sematics and DLQ confuguration. #73

Closed abhisheksingh87 closed 5 years ago

abhisheksingh87 commented 5 years ago

Hi,

We are trying to use the kafka connect with the mongodb sink connector in order to sink the genearated event messages to the mongodb. Below are few of our concerns and we need your suggestions on the same.

  1. Does the connect and connector support the idempotence and exactly once semantics? I tried to pass the producer and consumer config through the sink properties. But the connect starts with it default producer and consumer config. I used consumer.enable.auto.commit=false and so on.
  2. In case when mongodb server is down the connect retries and throws error 'Task is being killed and will not reciver until manually restarted'. Do we have a way to control this with some configuration? If the mongodb servers is up and running shouldn't it reconnect automatically? Or write to DLQ in case failed due to mongo or connector error? I tried errors.deadletterquee.tooic.name=my-connector-errors .

Thanks

hpgrahsl commented 5 years ago

Hi @abhisheksingh87 - thanks for reaching out.

1) exacty-once semantics: in general sink connectors cannot provide exactly-once behaviour for any given data and/or configuration. what is possible with e.g. my sink connector is to write against the sink (mongodb collection) with upsert semantics which, when combined with any unique attribute found in kafka records, gives you idempotent write behaviour. so if you make sure that your data in kafka exhibits any unique attribute in the first place you can achieve exactly-once semantics against the sink given a proper configuration (see DocumentIdAdder strategy options in the README)

2) when it comes to retries there is currently a very simple logic based on the following 2 config options: mongodb.max.num.retries (default 3) & mongodb.retries.defer.timeout (default 5000ms). this means if MongoDB is down for more than 15 secs (roughly the time of the 3 retries) the retries are exhausted in the sink connector and it will be killed and waits for manual intervention. the connector would of course continue its work in case MongoDB come back up during the retries. you can raise both config settings as you see fit. what's currently missing is e.g. a better strategy like exponential backoff. there is an open feature request #61 - feel free to help enhance this :)

currently cannot comment on the general DLQ feature of the connect framework itself since I haven't used it for now. be aware though that it may not be available to you in case you are running a version of kafka where this didn't exist yet.

please let me know if this helps or you need anything else. thx.

abhisheksingh87 commented 5 years ago

Hi @hpgrahsl, Thanks for your reply. We have some more clarification regarding the connector as mentioned below.

  1. Does the connector guarantees atleast once semantics?
  2. Is it possible to filter events based on any given parameter. We have use cases where we need to filter the write operations to mongodb collection based on an attribute in the event message. 3.How is the offset management handled by the connector?
hpgrahsl commented 5 years ago

1) yes of course. at least once semantics you get basically out-of-the-box without taking any special care on sink connector configuration. be adviced though that in my experience this is very rarely what you want. as I said you can make sure to have idempotent writes by doing key-based upserts just by configuring things accordingly and thereby get exactly-once semantics.

2) neither the kafka connect framework itself nor the sink connector is supposed to do filtering on records. while you could achieve this by implementing your custom write model for the sink connector I would discourage you from doing that. most likely the better way to go here is to have a stream processor (kstreams or KSQL) which takes care of filtering and then have let sink connector process the already filtered topic. 3) there is no explicit offset management done by the sink connector. it relies on what is configured on framework level i.e. connect commits the offsets in (configurable) regular intervals.

hope this clarifies your questions! if you plan to keep using my sink connector I'd be happy to learn about your concrete use case. ideally you're willing to share it "publicly" as a user voice/testimonial in the README. let me know :)

hpgrahsl commented 5 years ago

@abhisheksingh87 getting back to you about this. if anything is clarified I'd be happy if you close the issue. THX!