micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
83 stars 104 forks source link

Retry Mechanism Not Working with Reactive Types in Micronaut Kafka using @KafkaListener #967

Closed EnriqueWood closed 6 months ago

EnriqueWood commented 7 months ago

Expected Behavior

When using the @KafkaListener annotation with @ErrorStrategy(value = ErrorStrategyValue.RETRY_ON_ERROR), it is expected that the listener will retry the specified number of times (retryCount) with a certain delay (retryDelay) between retries when an exception is thrown. This behavior should be consistent regardless of whether the method returns a reactive type (e.g., Mono) or a non-reactive type.

Actual Behaviour

The retry mechanism works as expected for methods that do not return a reactive type. However, for methods returning a reactive type, such as Mono, the retry mechanism does not activate upon encountering exceptions. Instead of retrying, the error is logged, and no further action is taken, which leads to message processing being halted without retries.

Steps To Reproduce

  1. Configure a Micronaut application with micronaut-kafka integration.
  2. Define a @KafkaListener class with two methods listening to two different topics: one returning a non-reactive type and another returning a reactive type (Mono), both configured with an @ErrorStrategy that specifies retry behavior.
  3. Inject a message in each topic to simulate an error condition within both methods to trigger the retry mechanism.
  4. Observe that for the non-reactive method, the retry mechanism works as expected, with retries being logged and attempted.
  5. Observe that for the reactive method, the retry mechanism does not activate, and no retries are attempted after the initial error.

Code sample A WebhookServiceListener class is defined with two methods annotated with @Topic. One method, workingRetry, returns a non-reactive type and demonstrates the expected retry mechanism when an exception occurs. The other method, nonWorkingRetry, returns a reactive type (Mono) and fails to trigger the retry mechanism upon exceptions.

@KafkaListener(groupId = "mygroup",
        offsetReset = EARLIEST, offsetStrategy = DISABLED,
        errorStrategy = @ErrorStrategy(
                value = ErrorStrategyValue.RETRY_ON_ERROR,
                retryDelay = "3s",
                retryCount = 10
        ))
@Slf4j
public class WebhookServiceListener {
    int simulatedErrors = 0;

 @Topic("working_retry")
    public Object workingRetry(@KafkaKey String kafkaKey,
                               Event event,
                               long offset,
                               int partition,
                               String topic,
                               long timestamp,
                               Acknowledgement acknowledgement) throws RecoverableWebhookException {
        if (simulatedErrors < 5) {
            simulatedErrors++;
            log.info("{} - workingRetry Simulated error!", new Date());
            throw new RecoverableWebhookException("");
        }
        simulatedErrors=0;
        acknowledgement.ack();
        log.info("{} - Ack performed correctly!", new Date());
        return new Object();
    }

    @Topic("non_working_retry")
    public Mono<Object> nonWorkingRetry(@KafkaKey String kafkaKey,
                                        Event paymentEvent,
                                        long offset,
                                        int partition,
                                        String topic,
                                        long timestamp,
                                        Acknowledgement acknowledgement) {
        if (simulatedErrors < 5) {
            simulatedErrors++;
            log.info("{} - nonWorkingRetry Simulated error!", new Date());
            return Mono.error(new RecoverableWebhookException("simulated error"));
        }
        acknowledgement.ack();
        log.info("{} - nonWorkingRetry Ack performed correctly!", new Date());
        return Mono.just(new Object());
    }
}

Logs Upon Injecting a Message into the working_retry Topic:

[pool-1-thread-1] INFO com.example.webhook.service.WebhookServiceListener - Wed Feb 14 16:25:55 CLST 2024 - workingRetry Simulated error!
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-paymentEventsListener-1, groupId=paymentEventsListener] Seeking to offset 7 for partition working_retry-0
[pool-1-thread-1] INFO com.example.webhook.service.WebhookServiceListener - Wed Feb 14 16:25:58 CLST 2024 - workingRetry Simulated error!
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-paymentEventsListener-1, groupId=paymentEventsListener] Seeking to offset 7 for partition working_retry-0
[pool-1-thread-1] INFO com.example.webhook.service.WebhookServiceListener - Wed Feb 14 16:26:01 CLST 2024 - workingRetry Simulated error!
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-paymentEventsListener-1, groupId=paymentEventsListener] Seeking to offset 7 for partition working_retry-0
[pool-1-thread-1] INFO com.example.webhook.service.WebhookServiceListener - Wed Feb 14 16:26:04 CLST 2024 - workingRetry Simulated error!
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-paymentEventsListener-1, groupId=paymentEventsListener] Seeking to offset 7 for partition working_retry-0
[pool-1-thread-1] INFO com.example.webhook.service.WebhookServiceListener - Wed Feb 14 16:26:07 CLST 2024 - workingRetry Simulated error!
[pool-1-thread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-paymentEventsListener-1, groupId=paymentEventsListener] Seeking to offset 7 for partition working_retry-0
[pool-1-thread-1] INFO com.example.webhook.service.WebhookServiceListener - Wed Feb 14 16:26:10 CLST 2024 - Ack performed correctly!

Logs Upon Injecting a Message into the non_working_retry Topic:


[pool-1-thread-2] INFO com.example.webhook.service.WebhookServiceListener - Wed Feb 14 16:26:16 CLST 2024 - nonWorkingRetry Simulated error!
[pool-1-thread-2] ERROR io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler - Error processing record for Kafka consumer [com.example.webhook.service.WebhookServiceListener@3448fc73] produced error: simulated error
com.example.webhook.exception.RecoverableWebhookException: simulated error
... (stack trace showing the exception) ...

These logs evidence that while the retry mechanism works as expected for the workingRetry method (non-reactive return type), it does not engage for the nonWorkingRetry method (reactive return type), leading to a failure in retry attempts upon errors.

Environment Information

Micronaut Version: 4.3.1 (latest at this time) Micronaut Kafka Version: 5.3.0 JDK Version: OpenJDK 17.0.1 Operating System: macOS Sonoma 14.3.1

Example Application

No response

Version

4.3.1

jeremyg484 commented 7 months ago

Due to the default way Micronaut Kafka currently handles reactive return values (it optimistically continues the poll() loop and auto commits offsets without blocking), the only way I see that we can combine error strategies with reactive return values without breaking things would be to also require the @Blocking annotation on the reactive method. I was hoping that worked already, but it does not. I will put together a PR to fix that.