spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.18k stars 1.56k forks source link

Log conversion failures in batch listeners #3555

Open schmidti159 opened 4 days ago

schmidti159 commented 4 days ago

Current Behavior

When using a batch listener the BatchMessagingMessageConverter will put any ConversionExceptions into the CONVERSION_FAILURES header and provide a null payload for this message in the batch passed to the listener.

The exception is not visible in the log at all. The error handling is left up completely to the listener, that needs to inject the header and act on it.

Expected Behavior

In addition to putting the exception in the header it should also be logged as error or warning.

Optional: Provide a configuration property to skip such messages completely after logging the error instead of providing them with a null payload to the listener.

sobychacko commented 3 days ago

Logging the error in addition to the header is reasonable. However, skipping the record based on a configuration might be more invasive, and we must ensure no side effects. Feel free to submit a PR for this if you are up to it.

artembilan commented 3 days ago

I don't think this is something what has to be done in the framework unconditionally. More over it feels more like a target project responsibility. And for the purpose we provide a strategy RecordFilterStrategy. See more info in docs: https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/filtering.html#page-title. As you see there is a FilteringBatchMessageListenerAdapter under the hood. So, it is up to you to filter out those records with errors and logging them at the same moment, too.

I would close this as Works as Designed, but I'll wait for your feedback.

schmidti159 commented 3 hours ago

The issue with RecordFilterStrategy is, that it is executed earlier. It works on ConsumerRecords, but the conversion failure only happens later in the RecordMessageConverter. So to log the errors in the RecordFilterStrategy I actually would have to do the conversion twice: Once in the filter strategy to decide if we need to log the error and then it will be done again in the message converter.

I could implement it on my side by overriding the BatchMessagingMessageConverter with something like this:

    @Override
    protected Object convert(
            ConsumerRecord<?, ?> consumerRecord,
            Type type,
            List<ConversionException> conversionFailures) {
        var payload = super.convert(consumerRecord, type, conversionFailures);
        if (payload == null) {
            var conversionFailure = conversionFailures.getLast();
            if(conversionFailure != null) {
                log.error("Could not convert message for topic={}, partition={}, offset={}",
                        consumerRecord.topic(),
                        consumerRecord.partition(),
                        consumerRecord.offset(),
                        conversionFailure);
            }
            return null;
        }
        return payload;
    }

Another option I see and probably how this mechanism with exceptions as headers is intended to be used: Inject all needed headers into every listener (in this case topic, partition, offset and conversion failures) and call a helper method from the listener to log the conversion failures.

If you feel this should be handled by the target project you can close this issue.

Still, it might also be valuable to add a note to https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#payload-conversion-with-batch that by default messages that can't be converted will be passed with a null payload. Or even add a code snippet how to handle this error situation inside the listener method.