spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

KeyValueSerdeResolver uses streamConfigGlobalProperties and ignores extended consumer/producer properties #1176

Closed yanayita closed 2 years ago

yanayita commented 2 years ago

Given an application which uses Kafka Streams Binder, any consumer or producer configuration provided at binding level, is not being used by the KeyValueSerdeResolver to configure the Serde

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/9e4a1075d437f2e6aa7dad0a38d8286de94234a4/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java#L383-L394

Example:

spring.cloud.stream.function.definition=function1

spring.cloud.stream.bindings.function1-in-0.destination=topic1-in
spring.cloud.stream.bindings.function1-out-0.destination=topic1-out
spring.cloud.stream.kafka.streams.bindings.function1-in-0.consumer.configuration.spring.json.value.type.method=com.test.MyClass.determineType

When setting a breakpoint here: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/739b49996601a97322b986059f20618da0ccaf0a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java#L518

The JsonDeserializer in the Serde doesn't have a typeResolver or any other customization on binding level

Version of the framework Spring Boot 2.6.1 Spring Cloud 2021.0.0

Related to #1149