spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
330 stars 301 forks source link

Bug: Two BinderConfigurationPropertiesBeans are created #879

Closed rmvanderspek closed 4 years ago

rmvanderspek commented 4 years ago

Using the Spring Initializr I create a pom for a vanilla spring-boot application containing Cloud Stream and Spring for Kafka Streams:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

WIth a very simple application class:

package test;

import org.apache.kafka.streams.kstream.KStream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.util.function.Consumer;

@SpringBootApplication
public class KafkaTest {

    @Bean
    public Consumer<KStream<Object, String>> process() {
        return input ->
            input.foreach((key, value) -> {
                System.out.println("Key: " + key + " Value: " + value);
            });
    }

    public static void main(String[] args) {
        SpringApplication.run(KafkaTest.class, args);
    }

}

Running this application will fail:


APPLICATION FAILED TO START


Description:

Parameter 0 of method provisioningProvider in org.springframework.cloud.stream.binder.kafka.streams.KStreamBinderConfiguration required a single bean, but 2 were found:

  • kafkaBinderConfigurationProperties: defined by method 'kafkaBinderConfigurationProperties' in org.springframework.cloud.stream.binder.kafka.streams.MutliBinderPropertiesConfiguration
  • binderConfigurationProperties: defined by method 'binderConfigurationProperties' in class path resource [org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class]

Other than the above, nothing is added to this project. (No application.properties either.)

sobychacko commented 4 years ago

There was a regression we found out in the latest Kafka Streams binder (3.0.1.RELEASE). Can you please downgrade the version of spring-cloud-stream-binder-Kafka-streams to 3.0.0.RELEASE (need to set this version in the maven pom). We already fixed it on the latest snapshot (3.0.2.BUILD-SNAPSHOT) and will be part of the next release. There is a workaround you can try on the current release, but if you can go back to the previous version, that is easier.

rmvanderspek commented 4 years ago

Thank you for your response. That was indeed the issue. Downgrading to 3.0.0.RELEASE fixed it.

sureshachary commented 4 years ago

There was a regression we found out in the latest Kafka Streams binder (3.0.1.RELEASE). Can you please downgrade the version of spring-cloud-stream-binder-Kafka-streams to 3.0.0.RELEASE (need to set this version in the maven pom). We already fixed it on the latest snapshot (3.0.2.BUILD-SNAPSHOT) and will be part of the next release. There is a workaround you can try on the current release, but if you can go back to the previous version, that is easier.

Hello, any plan for a tentative date for fixing it?

sobychacko commented 4 years ago

@sureshachary Yes, this will be fixed in the upcoming 3.0.2 release which is scheduled tentatively for the end of this month. If that doesn't work, I can suggest some workarounds to fix the issue (Btw, this is already fixed on the snapshots).

amanzag commented 4 years ago

I just reproduced this with 3.0.3, is it supposed to be fixed?

sobychacko commented 4 years ago

@amanzag Could you give a bit more details? This was indeed fixed before 3.0.3. I am curious to find out a way to reproduce what you are experiencing.

sobychacko commented 4 years ago

@amanzag I used the same code provided above by @rmvanderspek. I am able to start the application fine without any errors.

@SpringBootApplication
public class Gh879Application {

    public static void main(String[] args) {
        SpringApplication.run(Gh879Application.class, args);
    }

    @Bean
    public Consumer<KStream<Object, String>> process() {
        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }

}

Relevant bits from maven pom:

<properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

Please advise how we can re-produce.

sobychacko commented 4 years ago

Closing this issue. Please see the above comments for details. Feel free to re-open, if you still face issues and provide details.

amanzag commented 4 years ago

Hi, here's how I reproduced it: pom.xml

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Horsham.SR3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.2.6.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
    </dependencies>

application.yml

spring:
  application.name: test
  cloud:
    stream:
      bindings:
        test2-in-0:
          destination: test2
          group: test2-group
          binder: kstream
        test2-out-0:
          destination: test3
          binder: kstream
      kafka:
        streams:
          binder:
            brokers: localhost:9092  
      binders:
        kstream:
          type: kstream

java code

@SpringBootApplication
public class KafkaStreamsPocApp {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsPocApp.class, args);
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> test2() {
        return a -> a;
    }

}

I get the following error when starting the app:


APPLICATION FAILED TO START


Description:

Parameter 0 of method provisioningProvider in org.springframework.cloud.stream.binder.kafka.streams.KStreamBinderConfiguration required a single bean, but 2 were found:

  • binderConfigurationProperties: defined by method 'binderConfigurationProperties' in class path resource [org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.class]
  • kstream-KafkaStreamsBinderConfigurationProperties: a programmatically registered singleton

Action:

Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed

Note that if I set the version of spring-cloud-stream-binder-kafka-streams to 3.0.0.RELEASE, it works with no problem.

Cheers Alberto

sobychacko commented 4 years ago

@amanzag It seems like you are using the multi binder feature, but you only have one type of binder anyway in the configuration. Do you really have a use case of multi binders? If not, your configuration can be re-written as below and that should work.

spring:
  application.name: test
  cloud:
    stream:
      bindings:
        test2-in-0:
          destination: test2
        test2-out-0:
          destination: test3
      kafka:
        streams:
          binder:
            brokers: localhost:9092  

If you really need to use the multi binders capability, then you should put your brokers configuration under an environment of that binder. For e.g, you need to modify your configuration as below.

spring:
  application.name: test
  cloud:
    stream:
      bindings:
        test2-in-0:
          destination: test2
          binder: kstream
        test2-out-0:
          destination: test3
          binder: kstream
      binders:
        kstream:
          type: kstream
          environment:
            spring.cloud.stream.kafka.streams.binder.brokers: localhost:9092
amanzag commented 4 years ago

Hi @sobychacko, you are right, as soon as I specify the config through an environment, it works. I do have a case for multi-binders (rabbit + kafka) but it seems to work now.

Thank you!

sobychacko commented 4 years ago

@amanzag Glad that worked. Please feel free to ping here and re-open the issue if you face any further problems with this.

zhaozhiguang commented 2 years ago

Version 3.2.2 appears again,in KafkaStreamsJaasConfiguration.class