Closed gpfrancis closed 3 years ago
Was able to workaround by stopping and simultaneously restarting all the consumers.
Was able to workaround by stopping and simultaneously restarting all the consumers.
This might be fixed in newer versions of Kafka, but when we did some experiments, we found this can cause a cascade of fails. If the containers/vms that consumers are in are provisioned with 'just enough' memory for normal operations, then they can get overloaded when all of them are stopped/started at the same time. The first to connect will get sent all of the partitions, when the second connect the producer rebalances and sends half the partitions to each, then when the third joins the producer rebalances again .. until all the consumers have joined and the partitions are shared evenly. If the first consumer doesn't have enough space to cope with all the partitions, it fails, dumping the whole lot onto the second consumer, which fails ... until all of them get caught up in a connect, fail, connect, fail loop.
Oops, I was going to leave a comment and clicked the wrong button.
Thanks for the warning. I don't think this will be a problem for us as the consumer VMs need quite a bit of memory to run Sherlock, which will not yet be used when starting up.
I think the original failure cascade is caused by the message processing taking (a lot) longer than the rebalancing process (minutes vs seconds). In theory the broker should wait for at least max.poll.interval.ms in order to allow all the consumers to complete, but I suspect it is not doing so. Perhaps this is fixed in a newer Kafka version.
I have a pull request (#87) that might resolve the problem. The consumer doesn't actually have to give up when a commit fails - it can just try again (and not provoke another rebalance). This will cause some messages to get processed twice, but we can cope with that.
Looks like the following can happen:
2021-07-13 11:12:05,214:ERROR:wrapper.py:Kafka Exception:KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}