spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.19k stars 1.56k forks source link

addRetryableExceptions has no effect when called on the DefaultErrorHandler #3621

Open devjeff opened 3 days ago

devjeff commented 3 days ago

In what version(s) of Spring for Apache Kafka are you seeing this issue?

3.2.4

Describe the bug

I tried to use the DefaultErrorHandler with a batch message listener. Here I wanted to specify some retryable exceptions (e. g. JDBCConnectionException) that cause a retry (as long as the backoff limit is not exceeded). What I noticed, is that calls to the metods "addRetryableExceptions " and "addNotRetryableExceptions" apparently have no effect.

The DefaultErrorHandler delegates error handling to a FallbackBatchErrorHandler (if the thrown exception is not a BatchListenerFailedException), but this handler never accesses the internal classifier instance of the DefaultErrorHandler, but instead creates a new one with the default exception list, s. also default constructor of the ExceptionClassifier.

To Reproduce

  1. Create a Kafka message listener with "@KafkaListener" and "@KafkaHandler"
  2. Create a DefaultErrorHandler instance and call the method "addRetryableExceptions" to configure a retryable exception
  3. Configure a ConcurrentKafkaListenerContainerFactory with by calling cf.setCommonErrorHandler(errorHandler)
  4. Also call cf.setBatchListener(true);, although I think that this is not necessary to reproduce the problem
  5. Throw the retryable exception during message processing in the message listener
  6. Send some messages to the Kafka topic. When the error occurs, a retry should be performed, but it isn't

Expected behavior

Maybe, this is the desired behaviour. If the message recovery fails, then the batch will be consumed again by the next poll anyway, which is effectively the same as a retry (and can result in an endless loop until the error is fixed). Still I think, that the DefaultErrorHandler should either not extend ExceptionClassifier or at least contain a description in the Javadoc explaining that the "addRetryableExceptions " and "addNotRetryableExceptions" have no effect on this class.

artembilan commented 2 days ago

So, logic there in the DefaultErrorHandler like this:

    public DefaultErrorHandler(@Nullable ConsumerRecordRecoverer recoverer, BackOff backOff,
            @Nullable BackOffHandler backOffHandler) {

        super(recoverer, backOff, backOffHandler, createFallback(backOff, recoverer));
    }

    private static CommonErrorHandler createFallback(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
        return new FallbackBatchErrorHandler(backOff, recoverer);
    }

So, yeah, whatever we set into that DefaultErrorHandler has no effect.

We probably need to propagate those addRetryableExceptions and addNotRetryableExceptions into that fallbackBatchHandler as we do for many other props:

    public void setRetryListeners(RetryListener... listeners) {
        super.setRetryListeners(listeners);
        if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
            handler.setRetryListeners(listeners);
        }
    }

    public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
        super.setClassifications(classifications, defaultValue);
        if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
            handler.setClassifications(classifications, defaultValue);
        }
    }

BTW, you can use that setClassifications() for now.

Feel free to contribute the fix: https://github.com/spring-projects/spring-kafka/blob/main/CONTRIBUTING.adoc !