reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 229 forks source link

Consumer does not rejoin group after heartbeat timeout #336

Open 62mkv opened 1 year ago

62mkv commented 1 year ago

Expected Behavior

Actual Behavior

We have a Reactive Spring Boot application that employs "reactor-kafka" for Kafka consumers and producers.

we use 1 KafkaReceiver per topic, that is subscribed to and kept in a Spring bean field.

I observe that sometimes, some or all of the underlying Consumer-s just stop with an error message as follows:

"[Consumer clientId=consumer-my-service-2-11, groupId=my-service-2] Member consumer-my-service-2-11-2ebeee54-566c-4ae8-ac43-1d5710fee1fa sending LeaveGroup request to coordinator 192.168.0.224:14493 (id: 2147483619 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."

(this is the last message in the log thus far; the application lives happily for a day already, after all 11 of consumers have stuck in this limbo; topic is consumed by other pods)

Regardless of what the error says, should not consumer still be restarted by the library/Kafka internals? Or should it be application author's responsibility to somehow track this state and react accordingly (for example, by implementing liveness health check around this somehow)?

Steps to Reproduce

Possible Solution

Your Environment

artembilan commented 1 year ago

Looks like this is this your SO question, too: https://stackoverflow.com/questions/76015312/how-to-properly-deal-with-zombie-kafka-consumers-in-reactive-spring-boot-appli

62mkv commented 1 year ago

this was originally asked on SO here: https://stackoverflow.com/questions/76015312/how-to-properly-deal-with-zombie-kafka-consumers-in-reactive-spring-boot-appli but as I read the code afterwards, it feels more and more like a bug :(

artembilan commented 1 year ago

I see. Any thoughts about the possible fix? Or share, please, with us what part of the project code you think is producing such a bug?

62mkv commented 1 year ago

Well I didn't design the library so it's hard to tell is it a bug of reactor -kafka, of kafka-clients or a completely valid behaviour. I've skimmed through the ConsumerCoordinator and it looks like it's bound to rejoin, if it's ever polled again. But is it polled or not - this, I guess, is reactor-kafka responsibility. So I am posting it here hoping that the community will set me straight if I am wrong šŸ™šŸ™

Sent from Mail.ru app for Android Friday, 14 April 2023, 06:52pm +03:00 from Artem Bilan @.*** :

I see. Any thoughts about the possible fix? Or share, please, with us what part of the project code you think is producing such a bug? ā€” Reply to this email directly, view it on GitHub , or unsubscribe . You are receiving this because you authored the thread. Message ID: @ github . com>

garyrussell commented 1 year ago

You need to add retry (and possibly repeat) to the pipeline: https://projectreactor.io/docs/kafka/release/reference/#_error_handling_2

62mkv commented 1 year ago

@garyrussell I see. Thanks so much for your help!

I will give it a try and close the ticket when I can confirm it's no longer manifesting.

May I also ask about a different thing: how useful would it be, to have a partition revoke handler commit offsets?

        var receiverOptions = ReceiverOptions.create(getConsumerProperties())
                                             .commitInterval(DEFAULT_COMMIT_INTERVAL)
                                             .addAssignListener(this::handlePartitionsAssignment)
                                             .addRevokeListener(this::handlePartitionsRevoking)
                                             .subscription(activeDestinations);
...
    @SneakyThrows(IllegalAccessException.class)
    private void handlePartitionsRevoking(Collection<ReceiverPartition> revokedPartitions) {
        var consumer = (Consumer<Object, Object>) FieldUtils.readField(kafkaReceiver.consumerHandler(), "consumer", true);

        try {
            consumer.commitSync(latestAcks, Duration.ofMillis(1000));
        } catch (Exception e) {
            log.warn("Ignored error on partition revoke", e);
        }
    }

It seems that the documentation says that downstream consumer should not be concerned with this:

All acknowledged offsets are committed when partitions are revoked during rebalance and when the receive Flux is terminated

So this quoted code in the handler, presented above, is some legacy code and could/should be removed? Or it might be needed still? We have WakeupException inside this handler occasionally so I thought I'd ask some experts..

garyrussell commented 1 year ago

I don't know what latestAcks is there but, indeed, the CommitEvent is run when partitions are revoked so any acknowledged records will be committed. See ConsumerEventLoop.onPartitionsRevoked:

https://github.com/reactor/reactor-kafka/blob/d1aa41b99d2993d1980d0a76561efb4a0f60d139/src/main/java/reactor/kafka/receiver/internals/ConsumerEventLoop.java#L161-L166