microsoft / kafka-connect-cosmosdb

Kafka Connect connectors for Azure Cosmos DB
MIT License
51 stars 55 forks source link

fixInfiniteLoopIssueWhenSourceTaskGotCancelled #545

Closed xinlian12 closed 10 months ago

xinlian12 commented 10 months ago

Issue: Post this change in kafka connect framework the connector uses a single thread for both poll() as well as for stop(). Before that change, the SourceTask::stop method would be invoked on a separate thread from the one that did the actual data processing for the task (polling the task for records, transforming and converting those records, then sending them to Kafka). After that change, SourceTask::stop() is being invoked on the same thread that handled data processing for the task. This had the effect that tasks which blocked indefinitely in the SourceTask::poll method with the expectation that they could stop blocking when SourceTask::stop was invoked would no longer be capable of graceful shutdown, and may even hang forever and the reverse is also true. Which is if stop() sleeps on the thread forever then poll() would never be able to continue.

This issue also related to https://github.com/microsoft/kafka-connect-cosmosdb/discussions/543

Changes:

  1. Remove the empty check from the stop() method
  2. Adding .block when calling changeFeedProcessor.stop(), else the changeFeedProcessor will not be stopped and all internal tasks are still running
  3. using dedicated Kafka-cfp-bounded-elastic schedulers