Open steve-gray opened 7 years ago
Hard to say what happens there. But definitely there is no new fetch triggered as it only happens on interval and only once previous fetch has finished.
My working theory is basically that the re-subscription to the newly updated group structure resets some state: then when the original handler finishes, it blows away marks something as done, preventing the re-triggering happening as expected.
When we set up wtfnode dumping timers and callbacks every N second, we can see the idle timeout is present and ticking before the issue, but after the problem occurs, only the timeout for the heartbeat in group consumer (which is there originally too) survives.
Is there any circumstance from your own reading g of the code that could make that timer for base_consumer fetching go away? I've tried playing with NSL_LEVEL to increase log verbosity but nothing leaps out.
Are you sure that all your handlers really resolve and don't hang?
Actually the fetching behaviour has been changed since https://github.com/oleksiyk/kafka/pull/85. Previously the fetch request was fired each options.idleTimeout
but only for those partitions that have finished prior processing. Now fetch is paused until all messages are processed meaning that some fast processing partitions will have to wait for slow processing partitions. I think I will change this behaviour back to original. This change will probably make you see fetch timeout ticking but anyway your issue is somewhere else.
@oleksiyk, I'd appreciate it if you can feed back as to whether or not you think this is a viable problem and something that might be happening, of if I'm barking up the wrong tree.
Observed Symptoms:
When consuming messages with no-kafka and heavily processing, including doing async callbacks/promise-chaining (i.e. doing data operations per message), it is possible that a group leadership election or cluster change event happens.
During this event, it appears that various state is modified, a new fetch is triggered for our subscribed partitions as a result of the rebalance.
Any partition that was in-flight during the event (i.e. paused-fetching) appears to be hung immediately after the fetch.
My belief is that the original promise is being satisfied whilst the new fetch has been queued for the rebalanced/re-formed groups, and this is leading to a double-tap on some finalisation code (likely in or around base_consumer::fetch).
I'm working with a colleague to reproduce the problem, but we're able to produce a completely stalled no-kafka backed worker process by basically doing a docker pause whilst executing handlers, as long as the pause duration is longer than the _consumer_offsets topic TTL.