spring-cloud / spring-cloud-bus

Spring Cloud event bus
http://cloud.spring.io/spring-cloud-bus/
Apache License 2.0
408 stars 242 forks source link

SCB dependency sets (de)serializers to ByteArray if (de)serializers are set via config files #248

Closed AskMeAgain closed 3 years ago

AskMeAgain commented 3 years ago

Describe the bug Iam using org.springframework.cloud:spring-cloud-starter-bus-kafka:3.0.1 Using spring-kafka i can set the deserializers via configs like this:

spring.kafka.consumer.key-deserializer: **path to class**

Adding the spring-cloud-starter-bus-kafka dependency, this behaviour is overridden, which results in setting the value ALWAYS to ByteArrayDeserializer. The same happens with both key and value serializers and deserializers

Interestingly all other settings are kept, like the client-id for example

spring.kafka.consumer.client-id: "test123"

Removing the dependency, results again in a working application

Outputting the Kafka Consumer configs, shows that the bytearraydeserializer is always set.

spencergibb commented 3 years ago

does this happen without bus, but with the spring cloud stream kafka binder only?

AskMeAgain commented 3 years ago

We figured out that this is because of the class

org.springframework.cloud.stream.binder.kafka.KafkaBinderEnvironmentPostProcessor

@Override
    public void postProcessEnvironment(ConfigurableEnvironment environment,
            SpringApplication application) {
        if (!environment.getPropertySources().contains(KAFKA_BINDER_DEFAULT_PROPERTIES)) {
            Map<String, Object> kafkaBinderDefaultProperties = new HashMap<>();
            kafkaBinderDefaultProperties.put("logging.level.org.I0Itec.zkclient",
                    "ERROR");
            kafkaBinderDefaultProperties.put("logging.level.kafka.server.KafkaConfig",
                    "ERROR");
            kafkaBinderDefaultProperties
                    .put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
            kafkaBinderDefaultProperties.put(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER,
                    ByteArraySerializer.class.getName());
            kafkaBinderDefaultProperties.put(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER,
                    ByteArraySerializer.class.getName());
            kafkaBinderDefaultProperties.put(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER,
                    ByteArrayDeserializer.class.getName());
            kafkaBinderDefaultProperties.put(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER,
                    ByteArrayDeserializer.class.getName());
            environment.getPropertySources().addLast(new MapPropertySource(
                    KAFKA_BINDER_DEFAULT_PROPERTIES, kafkaBinderDefaultProperties));
        }
    }

in the cloud-stream-binder gradle module. We just implemented a workaround for this and set the configs via code instead of yml properties

sshbur commented 1 year ago

Just confirming that this is still an issue in spring boot 2.7.13. Will be opening a new issue about this.