spring-projects / spring-boot

Spring Boot
https://spring.io/projects/spring-boot
Apache License 2.0
74.51k stars 40.53k forks source link

Review Kafka auto-configuration and support for defining additional beans #40174

Open notusedusername opened 5 months ago

notusedusername commented 5 months ago

Summary

Over-defining the ConsumerFactory instance in our configuration as stated in the spring-kafka docs won't take effect, but the default instance will be created and used instead. With a minor modification in the bean definition, the intended behavior can be achieved, but it isn't straightforward to do so.

To work the way the documentation states some spring-boot-autoconfigure related changes are required, or if it is not possible, then the spring-kafka documentation should be modified accordingly.

Details

I'm working with Spring Boot-based microservices connected by Kafka. The message values are in JSON format, in the serialized form of a common DTO, let's call it Document.

@Bean
public JsonDeserializer<Document> deserializer() {
    return new DocumentMessageDeserializer();
}

@Bean
public ConsumerFactory<String, Document> consumerFactory(
        JsonDeserializer<Document> deserializer,
        KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
    return new DefaultKafkaConsumerFactory<>(
            kafkaBinderConfigurationProperties.mergedConsumerConfiguration(),
            new StringDeserializer(),
            deserializer);
}

Expected behavior

Based on the spring-kafka documentation we should be able to override the default factory this way with our custom instance with the custom deserializer.

What happens instead

The service will get runtime errors about failed deserialization instead, as it deserializes the message with the default into a byte[] and then tries to cast it into a Document.

java.lang.RuntimeException: java.lang.ClassCastException: class [B cannot be cast to class com.example.dto.Document ([B is in module java.base of loader 'bootstrap'; com.example.dto.Document is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @59e84876)

On the other hand, if the property-based config is provided, then the service runs without issues.

spring:
  kafka:
    consumer:
      value-deserializer: com.example.serializers.DocumentMessageDeserializer

Based on these symptoms I'd say, that not our ConsumerFactory instance is used when creating the Consumer, but the default one. It will pick up the property-based configuration, but if the property is not provided then it will use the default deserializer. (Verified it by putting breakpoint into the KafkaListenerContainerFactory creation in the KafkaAnnotationDrivenConfiguration, and the injected factory is in fact null.)

Our programmatically configured @Bean should override the default ConsumerFactory (so we should be able to deserialize the messages without setting the property).

Reason

I saw that in the KafkaAnnotationDrivenConfiguration the ConsumerFactory is injected like this:

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
    .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}

The problem I found regarding this is that if we are defining a ConsumerFactory instance the way it is stated in the spring-kafka docs, it won't be injected here because the ConsumerFactory<String, Document> is not an instance of the ConsumerFactory<Object, Object>.

Workarounds

Proposed solution

IMO using wildcards when injecting the ConsumerFactory into the kafkaListenerContainerFactory would lead to complying with the kafka-spring documentation, the ConsumerFactory could be used with concrete type arguments instead of wildcards.

ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ObjectProvider<ConsumerFactory<?, ?>> kafkaConsumerFactory) {

If it is not possible for some reasons I'm not aware of, then the spring-kafka docs should be modified to contain the correct usage in the example.

Versions

wilkinsona commented 5 months ago

Spring Boot 2.7.x is no longer supported.

I think the same issue would persist with a version upgrade as well, but I didn't check that.

Please upgrade to 3.1.x or later and let us know if the problem remains. If it does, we can take a further look.

notusedusername commented 5 months ago

Thanks for the fast reply!

I tried it in other services already bumped to Spring Boot 3.x, and the same happened. The @Bean definitions from our side are the same in this version as it is in the report.

wilkinsona commented 5 months ago

Thank you. I think https://github.com/spring-projects/spring-boot/issues/19221 is relevant here. Reading the discussion there, I am reminded that the current behaviour is intentional.

The intent here is to auto-configure a ConcurrentKafkaListenerContainerFactory bean in the absence of a bean named kafkaListenerContainerFactory. While unusual for Spring Boot's auto-configuration, this is to allow additional container factories to be defined without the default backing off. If we relax the ConsumerFactory<Object, Object> signature, we may end up using a custom consumer factory that was intended for a different container factory rather than the default that we'd currently use.

I think this may have to be fixed by updating Spring Kafka's documentation. Before we go down that route, flagging for a team meeting in case there's anything that I've overlooked and a fix in Boot is actually possible.

wilkinsona commented 4 months ago

We discussed this and concluded that there's nothing we can do about this in the short- or medium-term. In the longer term, we'd like to reconsider this current behavior as part of investigating service bindings and support for auto-configuring multiple beans (#15732, #22403). In the meantime, the documentation for Spring Kafka should be updated. I've opened https://github.com/spring-projects/spring-kafka/issues/3242 for that.

notusedusername commented 4 months ago

I checked the issue you linked before, and it makes sense with the mentioned use case.

With the upcoming change in the Spring Kafka documentation, the 3rd workaround from the description becomes the actual solution for this particular problem.

Thanks for the help!

sobychacko commented 4 months ago

Documentation added to Spring Kafka via https://github.com/spring-projects/spring-kafka/pull/3243.