spring-projects / spring-boot

Spring Boot
https://spring.io/projects/spring-boot
Apache License 2.0
73.25k stars 40.17k forks source link

Kafka Config for consumer/producer by topic Enhancements #39066

Open monsdroid opened 4 months ago

monsdroid commented 4 months ago

We recently inherited a Porject with a lot of Kafka. Standing on the shoulder of the Spring Giants and with the recent updates we removed as much boilerplate as possible and tried to solve whats possible via configuration.

Delegating Serializer and Deserializer

Delegating Serializer and Deserializer It took us quite a while to figure why our config was not picked up. Obiously a beginners fail, solution is here If you use spring.kafka.value.serialization.bytopic.config (kafka property) you must set value-deserializer to org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer

A Working Config:

spring:
     kafka:
        ...
        consumer:
        ...
            key-deserializer: "org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer"
            value-deserializer: "org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer"
            properties:
            ...
                spring.kafka.key.serialization.bytopic.config: "${kafka.consumer.topic_one}:org.apache.kafka.common.serialization.StringDeserializer"
                spring.kafka.value.serialization.bytopic.config: "${kafka.consumer.topic_one}:com.example.custom.KafkaSomethiungDeserializer"
  1. Any chance that spring boot can warn about this or make those propeties its own so they can be autoconfigured?

  2. On the same Topic the value of the spring.kafka.value.serialization.bytopic.config is a comma separted list of "topicregex:some.com.package.hard.to.read.and.maintain.if.there.is.more.than.one.serializer" this list becomes hard to read/maintain. Beeing able to provide this list as "list" or even as map via yaml would be nice.

  3. To add some typesafety a Bean of ConsumerTopicToDeserializer or something similiar which autoconfiguration picks up to do it right and save us fools some time :-)

we used the customizer to add it before we found the solution up top

    @Bean
    fun customizeKafkaConsumerFactory(): DefaultKafkaConsumerFactoryCustomizer {
        return DefaultKafkaConsumerFactoryCustomizer {
            @Suppress("UNCHECKED_CAST")
            val factory = it as DefaultKafkaConsumerFactory<String, Any>
            run {
                factory.setKeyDeserializer(ErrorHandlingDeserializer(StringDeserializer()))
                factory.setValueDeserializer(
                    ErrorHandlingDeserializer(
                        DelegatingByTopicDeserializer(
                            mapOf(
                                Pattern.compile("SomeTopic") to SomerDeserializer(),
                            ),
                            JsonDeserializer<Any>(),
                        )
                    )
                )
            };
        }
    }
  1. How about some configured ErrorHandlingDeserializer

    spring:
     kafka:
        ...
        consumer:
        ...
            key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            properties:
            ...
                spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
                spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
    
                spring.kafka.key.serialization.bytopic.config: "${kafka.consumer.topic_one}:org.apache.kafka.common.serialization.StringDeserializer"
                spring.kafka.value.serialization.bytopic.config: "${kafka.consumer.topic_one}:com.example.custom.KafkaSomethiungDeserializer"

Maybe you see some things which can be adressed in the Documentation and/or autoconfig.

Thanks for the great work!

wilkinsona commented 3 months ago

Thanks for the suggestions.

Any chance that spring boot can warn about this or make those properties its own so they can be auto-configured?

Boot's spring.kafka.consumer.key-deserializer maps to Kafka's key.deserializer property. The properties within spring.kafka.consumer.properties mean nothing to Spring Boot and are passed straight through. Ultimately, these properties are all in the Map<String, Object> that's used to create the Spring Kafka DefaultKafkaConsumerFactory(Map<String, Object>).

Given the above, I'm not sure that Boot is the right place to encode knowledge about the relationship between the key.deserializer property and the spring.kafka.key.serialization.bytopic.config property. It feels to me like Spring Kafka is better placed to do this as it's the project with the knowledge that you have to use a particular deserializer for spring.kafka.key.serialization.bytopic.config to have an effect. Furthermore, Spring Kafka performing this validation would also mean that it would benefit everyone, not just those using Spring Boot.

WDYT, @artembilan?

On the same topic, the value of the spring.kafka.value.serialization.bytopic.config is a comma-separated list [that] becomes hard to read/maintain. Being able to provide this list as "list" or even as map via yaml would be nice.

It's Spring Kafka's DelegatingByTopicSerialization sub-classes that process this property. They appear to support that value being either a Map or a String. The Map case opens up some possibilities for richer configuration. You could implement something today in your app using a DefaultKafkaConsumerFactoryCustomizer and DefaultKafkaConsumerFactory.updateConfigs(Map<String, Object>) to configure the spring.kafka.key.serialization.bytopic.config and spring.kafka.value.serialization.bytopic.config properties with Map values.

We could also look at providing dedicated configuration properties for this in Boot that we'd map to Map values for spring.kafka.key.serialization.bytopic.config and spring.kafka.value.serialization.bytopic.config. When set, this would then open up the possibility of also setting key.deserializer to a sensible default unless the user's also configured spring.kafka.consumer.key-deserializer and/or spring.kafka.consumer.value-deserializer. Alternatively, we could map this to the DefaultKafkaConsumerFactory constructor that takes the deserializers as constructor arguments.

To add some typesafety a Bean of ConsumerTopicToDeserializer or something similiar which autoconfiguration picks up to do it right and save us fools some time

That's an interesting idea. It certainly feels preferable to configuring class names in YAML or properties which is always error prone.

How about some configured ErrorHandlingDeserializer

I'm not sure about this one as I think it could get rather complicated. You'd need to somehow configure a delegate everywhere that the error handling deserializer may be used. With some of the other improvements discussed above, I suspect there may be less of a need for this as it would be easier to perform the error-handling decoration in your own code.

artembilan commented 3 months ago

OK. I see what is going on. You try to push coding from Java back to YAML which is indeed error-prone. I agree that simple one-stop-shop properties are OK to provide via YAML, but when we go to something more complicated or tied together, it is better to control it from the code where it is less possible to make a mistake.

That spring.kafka.value.serialization.bytopic.config property is read only from the DelegatingByTopicDeserializer, so no way to warn you that something is off. Same for the ErrorHandlingDeserializer and its spring.deserializer.value.delegate.class. Just because all those spring.kafka.consumer.properties are outside of configuration properties binding in Spring Boot.

What I would suggest is like a top-level end-user Deserializer bean to be injected into the KafkaAnnotationDrivenConfiguration where we would propagate it down to the DefaultKafkaConsumerFactory constructor:

    public DefaultKafkaConsumerFactory(Map<String, Object> configs,
            @Nullable Deserializer<K> keyDeserializer,
            @Nullable Deserializer<V> valueDeserializer) {

Although from here it is not clear how to distinguish between key & value deserializer beans...

This way you would not need that DefaultKafkaConsumerFactoryCustomizer, but rather would concentrate on the Deserializer configuration in the familiar Java style.

I also wonder if this YAML feature would make it a bit easier for you for time being:

properties:
        spring.kafka.value.serialization.bytopic.config:
          - ${kafka.consumer.topic_one}: com.example.custom.KafkaSomethiungDeserializer
          - ${kafka.consumer.topic_two}: org.apache.kafka.common.serialization.StringDeserializer

CC @sobychacko

sobychacko commented 3 months ago

+1 to the idea from @artembilan where users provide the deserializers as beans, and then Spring Boot injects them into DefaultKafkaConsumerFactory, giving users control over how they want to configure the deserializers via code. We need to devise a way to distinguish between key and value deserializers. Maybe we can look into some naming conventions or something similar? I also like what @wilkinsona suggested for providing dedicated configuration options in Boot to make this integration with DefaultKafkaConsumerFactory transparent. This way, the users can drive this from properties rather than providing deserializer beans.

StephenFlavin commented 3 weeks ago

Hi, I'm interested in this feature but I'm having trouble trying to configure it.

Can you show what the @KafkaListener looks like for this?

In my case I have many topics, some topics use AVRO values and some use JSON values, so I use the confluent KafkaAvroDeserializer for one and the spring JsonDeserializer for the other, however, since each topic maps to a different resulting class I don't think it works.

Edit:

I had a go at implementing this, I set the avro deserializer as the bytopic default and ended up having to use custom deserializers for each JSON topic that just extends the spring JsonDeserializer with a concrete type, I was glad to see that the Kafkalistener can just accept ConsumerRecords<Object, Object>

+1 to getting spring.kafka.value.serialization.bytopic.config configured as a map in yaml