Closed sangreal closed 3 months ago
Hi @sangreal - can you please elaborate a bit more - what gets interrupted by which wake-up?
Are you observing - that workerThreadPool.get().awaitTermination(toSeconds(timeout), SECONDS)
is interrupted by consumerManager.wakeup()
calls in BrokerPoller?
Hi @rkolesnev since we were experiencing rebalancing storm. I think this should be from assigning partition. But since we don't enable debug level log. I am not sure. https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java#L446
But it is workerThreadPool.get().awaitTermination(toSeconds(timeout), SECONDS)
be interrupted.
Ok, i see, thanks.
The interrupt / blocking thread handling always makes my head hurt a bit :)
So - i guess we can either add a check to notifySomethingToDo()
method - if in Closing state (or not polling mailbox right now) - don't interrupt control thread - actually it used to have that check - there is still the atomicBoolean - currentlyPollingWorkCompleteMailBox
but it is no longer being checked prior to interrupting the control thread.
Alternatively - wrap the awaitTermination()
call in the loop with try / catch for interrupted exception to prevent finishing the wait early.
@rkolesnev I think checking Closing state in notifySomethingToDo
is the neat way to handle this.
If you don't mind I will draft a pr for this one.
Closing as #789 is merged.
Version: 0.5.2.5 (+ stale container fix (#623))
Scenario: When using, some of the health check logic on our end will trigger rebalancing storm by re-initialization. The closing is constantly interrupted by wake-up interrupt by poller. And we met with below exception.
Solution:
workerThreadPool
. Should not just reset the flag. as https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java#L6573.I will draft a pr when I have time.