reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 229 forks source link

Consumers are getting dropped off on offset commit failures during prolonged kafka rebalancing #394

Open aswinavofficial opened 1 month ago

aswinavofficial commented 1 month ago

Expected Behavior

When rebalancing occurs during a commit and all offset commit retries are exhausted, the Reactor Kafka library should poll again and process uncommitted messages. The Kafka consumer should not be dropped and should continue processing the next batch of messages.

Actual Behavior

We have a distributed system with a Kafka topic containing 200 partitions and consumers. Due to network issues, latency, or other reasons, rebalancing may be triggered. If an offset commit fails during rebalancing and the rebalancing continues beyond the retry period, Kafka consumers are removed from the consumer group.

I reviewed the Reactor Kafka library and found that the asyncCleanup in the withHandler method stops the Consumer Event Loop. In a non-reactive Kafka consumer implementation, there is usually an infinite loop for poll(), where exceptions are caught, and the consumer continues to process the next set of messages. However, in reactive Kafka, the consumer event loop itself gets closed.

I have used repeat, retry workaround and increased offset commit retry attempts, but still it is not working.


DefaultKafkaReceiver.java
 private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerHandler<K, V>, Flux<T>> function) {
        return Flux.usingWhen(
            Mono.fromCallable(() -> {
                ConsumerHandler<K, V> consumerHandler = new ConsumerHandler<>(
                    receiverOptions,
                    consumerFactory.createConsumer(receiverOptions),
                    // Always use the currently set value
                    e -> isRetriableException.test(e),
                    ackMode
                );
                consumerHandlerRef.set(consumerHandler);
                return consumerHandler;
            }),
            handler -> Flux.using(
                () -> Schedulers.single(receiverOptions.schedulerSupplier().get()),
                scheduler -> function.apply(scheduler, handler),
                Scheduler::dispose
            ),
            **handler -> handler.close().doFinally(__ -> consumerHandlerRef.compareAndSet(handler, null))**
        );
    }

ConsumerHandler.java
public Mono<Void> close() {
        if (consumerListener != null) {
            consumerListener.consumerRemoved(consumerId, consumer);
        }
        **return consumerEventLoop.stop().doFinally(__ -> eventScheduler.dispose());**
    }

Steps to Reproduce

private Disposable poll(KafkaReceiver<String, String> receiver) {
        return receiver.receive()
                .publishOn(elasticScheduler)
                .doOnError(KafkaConsumer::logEventConsumptionFailure)
                .retryWhen(getRetryStrategy())
                .onErrorResume(KafkaConsumer::handleErrorOnEventConsumption)
                .flatMap(this::toKafkaEvent)
                .flatMap(kafkaEvent ->
                                stream.handleEvent(kafkaEvent)
                                        .switchIfEmpty(Mono.defer(() -> Mono.just(kafkaEvent)))
                        , steamConfig.getConsumerParallelism)
                .flatMap(this::commitKafkaOffset, steamConfig.getConsumerParallelism)
                .doOnTerminate(()-> {
                    isConsumerActive = false;
                    log.error("Kafka consumer got terminated");})
                .repeat()
                .subscribe();
    }

private Mono<KafkaEvent> commitKafkaOffset(KafkaEvent kafkaEvent) {
        return  Mono.just(kafkaEvent)
                .doOnNext(action -> logOffsetCommit(kafkaEvent))
                .flatMap(event -> kafkaEvent.getMessageRecord().receiverOffset().commit())
                .retryWhen(Retry.backoff(steamConfig.maxCommitAttempts, Duration.ofSeconds(1)).transientErrors(true))
                .doOnError(exception -> logOffsetCommitFailure(kafkaEvent, (Exception) exception))
                .onErrorResume(exception -> Mono.empty())
                .then(Mono.just(kafkaEvent))
                .doOnNext(action -> logSuccessfulOffsetCommit(kafkaEvent));
    }

private static void logEventConsumptionFailure(Throwable error) {
        log.error("Failed to consume events from topic {}, {}, {}",
                keyValue(Constant.ERROR, error),
                keyValue(Constant.PHASE, "CONSUMER_ERROR"),
                keyValue(Constant.EXCEPTION, LoggingUtil.formatExceptionAsMapForLogging(error)));
    }

Kafka Properties kafka.session.timeout.ms=300000 kafka.heartbeat.interval.ms=30000 kafka.request.timeout.ms=180000 kafka.max.poll.records=500 kafka.max.poll.interval.ms=300000

kafka consumer retry config

max.commit.attempts=200 commit.retry.interval=5000

max.delay.rebalance.ms=240000

Logs

WARN  reactor.kafka.receiver.internals.ConsumerEventLoop.ConsumerEventLoop$CommitEvent.handleFailure:497 - Commit failed
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.

INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ConsumerCoordinator.invokePartitionsLost:366 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Lost previously assigned partitions kafka-topic-95
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ConsumerCoordinator.invokePartitionsLost:370 - [Consumer kafka-consumer-app, groupId=consumer-group-1] The pause flag in partitions [kafka-topic-95] will be removed due to partition lost.
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.maybeLeaveGroup:1133 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Member kafka-consumer-app-62d7b238-3ecd-4631-9103-ffe8420e23e2 sending LeaveGroup request to coordinator kafka-broker-host:9092 (id: 2039073794 rack: null) due to the consumer is being closed
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.resetStateAndGeneration:1025 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Resetting generation and member id due to: consumer pro-actively leaving the group
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.requestRejoin:1072 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Request joining group due to: consumer pro-actively leaving the group

 org.apache.kafka.common.metrics.Metrics.Metrics.close:693 - Metrics scheduler closed
 INFO  org.apache.kafka.common.metrics.Metrics.Metrics.close:697 - Closing reporter org.apache.kafka.common.metrics.JmxReporter
 INFO  org.apache.kafka.common.metrics.Metrics.Metrics.close:703 - Metrics reporters closed
 INFO  org.apache.kafka.common.utils.AppInfoParser.AppInfoParser.unregisterAppInfo:83 - App info kafka.consumer for kafka-consumer-app unregistered

Possible Solution

The Consumer Event Loop should not be closed during cleanup. Instead, it should continue polling for messages.

Your Environment

aswinavofficial commented 1 month ago

Hi @garyrussell , @chemicL , @pderop , @artembilan Could you please help?

artembilan commented 1 month ago

The logic there is like this:

    public static <T, D> Flux<T> usingWhen(Publisher<D> resourceSupplier,
            Function<? super D, ? extends Publisher<? extends T>> resourceClosure,
            Function<? super D, ? extends Publisher<?>> asyncCleanup) {
        return usingWhen(resourceSupplier, resourceClosure, asyncCleanup, (resource, error) -> asyncCleanup.apply(resource), asyncCleanup);
    }

So, that handler.close() is called not only when we dispose a Flux exposed to end-user, but also in case of error.

How is your use-case handled in a non-reactive scenario? How that removed from the consumer group is able to consume more message if it has to be re-subscribed first?

See KafkaConsumer.poll() JavaDocs - just exactly first sentence:

     * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
     * subscribed to any topics or partitions before polling for data.

So, as you said: if we fail to call poll() during kafka.max.poll.interval.ms, then our consumer is kicked off from the group. And there is no other way to come back until we call subscribe again.

Try with onErrorContinue() instead of onErrorResume(). See its JavaDocs for more info.

The removal of the mentioned code is not a solution, though.

aswinavofficial commented 1 month ago

Hi @artembilan , In a non-reactive approach, we can write code that can catches any exceptions and re-subscribes to the topic. We expect the same behavior in a reactive approach, where any errors should be caught, the consumer should automatically re-subscribe to the topic, and it should not be dropped.

We have used doOnError operator to log the error, but it was not found in the logs. Retry also didn't worked.

      .doOnError(KafkaConsumer::logEventConsumptionFailure)
      .retryWhen(getRetryStrategy())
      .onErrorResume(KafkaConsumer::handleErrorOnEventConsumption)
      .
      .
       .repeat()

Non reactive example

KafkaConsumer < String, String > consumer = new KafkaConsumer < > (consumerConfig);
consumer.subscribe(Arrays.asList(topic));
while (true) {
    try {
        ConsumerRecords < String, String > records = consumer.poll(pollTimeout);
        for (ConsumerRecord < String, String > record: records) {
            log.info("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    } catch (Throwable e) {
        log.error("KafkaConsumerTask exited with exception: ", e);
        try {
            // clean up and shutdown the consumer if it exists
            if (consumer != null) {
                consumer.close();
                logger.info("KafkaConsumerTask exception: consumer closed");
            }
            // wait for few seconds
            Thread.sleep(60000);
            // recreate the consumer
            consumer = new KafkaConsumer < > (consumerConfig);
            consumer.subscribe(Arrays.asList(topic));
            logger.info("KafkaConsumerTask exception: consumer started again");
        } catch (InterruptedException ie) {
            log.error("KafkaConsumerTask thread interrupted: ", ie);
        }
    }
}

We have created a custom actuator endpoint that checks the value of the isConsumerActive flag and restarts the consumer if it is false. However, in this case, "Kafka consumer got terminated" was not found in the log, and isConsumerActive was not set to false.

doOnTerminate(()-> {
                    isConsumerActive = false;
                    log.error("Kafka consumer got terminated");})

Attaching logs


WARN  reactor.kafka.receiver.internals.ConsumerEventLoop.ConsumerEventLoop$CommitEvent.handleFailure:497 - Commit failed
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.

INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ConsumerCoordinator.invokePartitionsLost:366 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Lost previously assigned partitions kafka-topic-95
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ConsumerCoordinator.invokePartitionsLost:370 - [Consumer kafka-consumer-app, groupId=consumer-group-1] The pause flag in partitions [kafka-topic-95] will be removed due to partition lost.
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.maybeLeaveGroup:1133 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Member kafka-consumer-app-62d7b238-3ecd-4631-9103-ffe8420e23e2 sending LeaveGroup request to coordinator kafka-broker-host:9092 (id: 2039073794 rack: null) due to the consumer is being closed
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.resetStateAndGeneration:1025 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Resetting generation and member id due to: consumer pro-actively leaving the group
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.requestRejoin:1072 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Request joining group due to: consumer pro-actively leaving the group

 org.apache.kafka.common.metrics.Metrics.Metrics.close:693 - Metrics scheduler closed
 INFO  org.apache.kafka.common.metrics.Metrics.Metrics.close:697 - Closing reporter org.apache.kafka.common.metrics.JmxReporter
 INFO  org.apache.kafka.common.metrics.Metrics.Metrics.close:703 - Metrics reporters closed
 INFO  org.apache.kafka.common.utils.AppInfoParser.AppInfoParser.unregisterAppInfo:83 - App info kafka.consumer for kafka-consumer-app unregistered
artembilan commented 1 month ago

OK. So you do loop yourself, but you still ask something to be done in this library. Why just not try that onErrorContinue() for the Flux returned from KafkaReceiver?

Otherwise, please, elaborate what exactly we could do over here, but really not that dropping for the consumerEventLoop.stop(). Since this is a crucial part of the whole reactive subscriber lifecycle.

aswinavofficial commented 1 month ago

Hi @artembilan , Attached Non-reactive code having an infinite loop is part of another java application where we are not facing the consumer dropoff issue.

I will add onErrorContinue operator but my doubt is that we have doOnError operator in place but the message "Failed to consume events from topic" was not found in the log. Consumer silently dropped from the group.

WARN  reactor.kafka.receiver.internals.ConsumerEventLoop.ConsumerEventLoop$CommitEvent.handleFailure:497 - Commit failed
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.

INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ConsumerCoordinator.invokePartitionsLost:366 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Lost previously assigned partitions kafka-topic-95
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ConsumerCoordinator.invokePartitionsLost:370 - [Consumer kafka-consumer-app, groupId=consumer-group-1] The pause flag in partitions [kafka-topic-95] will be removed due to partition lost.
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.maybeLeaveGroup:1133 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Member kafka-consumer-app-62d7b238-3ecd-4631-9103-ffe8420e23e2 sending LeaveGroup request to coordinator kafka-broker-host:9092 (id: 2039073794 rack: null) due to the consumer is being closed
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.resetStateAndGeneration:1025 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Resetting generation and member id due to: consumer pro-actively leaving the group
INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.requestRejoin:1072 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Request joining group due to: consumer pro-actively leaving the group

 org.apache.kafka.common.metrics.Metrics.Metrics.close:693 - Metrics scheduler closed
 INFO  org.apache.kafka.common.metrics.Metrics.Metrics.close:697 - Closing reporter org.apache.kafka.common.metrics.JmxReporter
 INFO  org.apache.kafka.common.metrics.Metrics.Metrics.close:703 - Metrics reporters closed
 INFO  org.apache.kafka.common.utils.AppInfoParser.AppInfoParser.unregisterAppInfo:83 - App info kafka.consumer for kafka-consumer-app unregistered
artembilan commented 1 month ago

OK. Any chances to have a simple project from you to let us to reproduce and play with on our side? We are totally off how to make this "consumer drop off of the group" 😢 Thank you!

aswinavofficial commented 1 month ago

Sure @artembilan I will try to have one.

One way I reproduced this issue in our staging environment earlier was by pushing a few million messages to the Kafka topic, having 100 pods connected to it, and performing continuous rolling restarts, which prolonged the rebalancing process. I solved the issue by adding retry and repeat but in this special case, flow is not even going to doOnError operator

artembilan commented 1 month ago

having 100 pods connected to it

Yeah... That is not what I'm going to do here locally. I don't think that it has to be so dramatic to cause the problem...

aswinavofficial commented 1 month ago

@artembilan I will try to reproduce this on my local machine.