spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1.01k stars 614 forks source link

Problem Configuring conditional sources for binder with DefaultCustomBinderFactory #3031

Open ferblaca opened 1 week ago

ferblaca commented 1 week ago

Describe the issue It seems there are issues when configuring conditional sources for a binder in a multi-binder scenario (at least).

To explain the problem, I have created a simple application that you can find in this repository on the branch issue-binder-factory-sources-order.

I have created a simple configuration KafkaProducerConsumerConfiguration that customizes the clientId value of Kafka clients, which is conditional on the existence of a property value:

@ConditionalOnProperty(value = "custom-client-id.enabled", havingValue = "true", matchIfMissing = false)
public class KafkaProducerConsumerConfiguration {

    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(KafkaProducerConsumerConfiguration.class);

    private static final String CUSTOM = "CUSTOM_";

    @Bean
    public ConsumerConfigCustomizer consumerConfigCustomizer() {
        return (consumerProperties, bindingName, destination) -> {
            LOG.info("Customizing consumer client id for binding: {}", bindingName);
            consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, CUSTOM + bindingName);
        };
    }

    @Bean
    public ProducerConfigCustomizer producerConfigCustomizer() {
        return (producerProperties, bindingName, destination) -> {
            LOG.info("Customizing producer client id for binding: {}", bindingName);
            producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, CUSTOM + bindingName);
        };
    }
}

I have added the configuration to the Kafka binders using the spring.main.sources property as follows:

spring:
  cloud:
    stream:
      defaultBinder: kafka1
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              main:
                sources: com.example.demoStreamKafka.sources.KafkaProducerConsumerConfiguration
        kafka2:
          type: kafka
          environment:
            spring:
              main:
                sources: com.example.demoStreamKafka.sources.KafkaProducerConsumerConfiguration

The problem is that the binder context, during its registration with DefaultBinderFactory, does not seem to have the propertySources to correctly evaluate the ConditionalOnProperty condition, and therefore the application does not customize the clientId's of the Kafka clients.

To Reproduce Steps to reproduce the behavior:

  1. Start the application
  2. Check that the application logs do not show the INFO logs of the consumerConfigCustomizer and producerConfigCustomizer beans.

Version of the framework Spring-Boot 3.3.4 Spring-Cloud 2023.0.3

Expected behavior The KafkaProducerConsumerConfiguration class is correctly configured, and the following logs are displayed when the application starts:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.3.4)

2024-11-06T16:43:40.017+01:00  INFO 12647 --- [           main] e.d.s.KafkaProducerConsumerConfiguration : Customizing producer client id for binding: foo-out-0
2024-11-06T16:43:40.139+01:00  INFO 12647 --- [           main] e.d.s.KafkaProducerConsumerConfiguration : Customizing consumer client id for binding: completeConsumer1-in-0
2024-11-06T16:43:40.356+01:00  INFO 12647 --- [           main] e.d.s.KafkaProducerConsumerConfiguration : Customizing consumer client id for binding: completeConsumer2-in-0

Additional context To verify that the problem is the ConditionalOnProperty, comment out that line and restart the application to see the expected result.

ferblaca commented 1 week ago

I have created the PR https://github.com/spring-cloud/spring-cloud-stream/pull/3032 that fixes the issue.

Apparently, the problem was that the configuration classes were being registered in the binder context before the Environment of the context had the list of property sources, so the property conditionals of the classes never matched.