micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
83 stars 104 forks source link

Add ability to conditionally retry messages with KafkaListener's retry error stragegy #947

Closed MRyanEvans closed 7 months ago

MRyanEvans commented 8 months ago

Desired behaviour

A listener with the retry error strategy always retries the message; however, there are some scerarios where a retry is not necessary or desirable.

For example, there are two types of deserialisation errors:

For these cases it would be desireable to skip the first one and retry the second.

This is particularly helpful where we would not want to miss valid messages and have retryCount = Integer.MAX_VALUE, where we'd have to wait ages until our application eventually skips the message with the current behaviour.

Workaround

The only way that I've been able to achieve this desired behaviour is to disable the retry mechanism and implement my own KafkaListenerExceptionHandler which seeks back to the message or forwards to the next depending on the exception thrown. The downside is that I lose the ability to easily add retry delays and exponential back-offs.

Suggested fix

My PR reorders the seek and exception handling so that any seeking the exception handler decides to do is honoured.

Additional notes

I am happy to define the seek forward/backward behaviour in my own exception handler; however it might be simpler if you were to open up a similar hander that just needs to return 'retry' or 'skip' and have the seeking done in your code.

TL;DR

Basically, I want my custom exception handler with the retry strategy to be able to skip messages.

@KafkaListener(
    errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, handleAllExceptions = true)
)
static class FooConsumer implements KafkaListenerExceptionHandler {
    @Topic("foo")
    void handleMessage(ConsumerRecord record) {
        doAThing(record);
    }

    @Override
    void handle(KafkaListenerException exception) {
        if(message is bad) {
            exception.kafkaConsumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
        }
    }
}