salesforce / mirus

Mirus is a cross data-center data replication tool for Apache Kafka
BSD 3-Clause "New" or "Revised" License
203 stars 43 forks source link

close KafkaConsumer when task is stopping #82

Closed YongGang closed 3 years ago

YongGang commented 3 years ago

Currently MirusSourceTask closes KafkaConsumer in poll method, but in some cases when task is stopping, poll may be unable to run in time to close resources properly. Now change to close KafkaConsumer and JmxReporter in stop method.

pdavidson100 commented 3 years ago

@YongGang Why remove the Semaphore? That seemed to give us thread safety, which is recommended in the Kafka Connect documentation. Even if stop and poll are always called in the same thread in the current version of Kafka Connect the documentation suggests this may change in future. Should we replace it with some other thread-safe locking mechanism?

YongGang commented 3 years ago

There are two multithreading scenarios we need to handle:

  1. multi threads call stop method: this is protected by synchronized (seems this is also recommended in the doc)
  2. multi threads call poll and stop: although Semaphore make these two methods thread-safe but don't think it's necessary as we already have shutDown check that basically prevent poll run when task is stopping. In our code the only place poll can hang is consumer.poll(), but we have consumer.wakeup() in stop handle this case.
pdavidson100 commented 3 years ago

Thanks @YongGang, seems reasonable. I hand't noticed the addition of synchronized.