GoogleCloudPlatform / spring-cloud-gcp

New home for Spring Cloud GCP development starting with version 2.0.
Apache License 2.0
415 stars 307 forks source link

Unable to use multiple gcp pubsub binder in spring cloud stream #1688

Closed nilusilu95 closed 1 year ago

nilusilu95 commented 1 year ago

Describe the bug The application is reading from multiple kafka topics and it works well with multi binder configuration of spring cloud stream. This needs to read from gcp pubsub and it fails.

APPLICATION FAILED TO START Description: A component required a bean of type 'org.springframework.cloud.stream.binder.Binder' that could not be found. Action: Consider defining a bean of type 'org.springframework.cloud.stream.binder.Binder' in your configuration.

Using dependency management spring.boot.version: 2.7.2 spring-cloud.version: 2021.0.3 spring-cloud.gcp.version: 3.3.0

<dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>spring-cloud-gcp-dependencies</artifactId>
                <version>${spring-cloud.gcp.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
 <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
</dependency>

spring cloud stream : 3.2.6 kafka binder : 3.2.4 pubsub binder: 3.3.0

Sample

--Spring config starts--

spring:
  cloud:
    gcp:
      project-id: a
      credentials:
        location: file:gcp-sandbox.json
    stream:
      gcp:
        pubsub:
          bindings:
            gcpConsumer-in-0:
              consumer:
                subscriptionName: xyz
      bindings:
        kafkaConsumer-in-0:
          destination: kafkaTopic
          binder: kafkaBinder
          group: consumer-kafka
        gcpConsumer-in-0:
          destination: gcpTopic
          binder: gcpBinder
      binders:
        gcpBinder:
          type: pubsub
          environment:
            spring:
              cloud:
                gcp:
                  credentials:
                    location: file:gcp-sandbox.json
        kafkaBinder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: ${KAFKA_BROKERS}
                      configuration:
                        security.protocol: SASL_SSL
                        sasl.mechanism: PLAIN
                        sasl.jaas.config: *jaas config*;
      function:
        definition: kafkaConsumer;gcpConsumer;
      poller:
        initial-delay: 0
        fixed-delay: 1000
      kafka:
        binder:
          autoCreateTopics: false

--Spring config ends--

@Service
public class MultiConsumer {
    @Bean
    public Consumer<Message<String>> gcpConsumer() {
        return message -> {
            System.out.println("GCP-Pubsub: " + message.getPayload());
        };
    }

    @Bean
    public Consumer<Message<String>> kafkaConsumer() {
        return message -> {
            System.out.println("Kafka: " + message.getPayload());
        };
    }
}

My observation is when it doesn't work with multibinder configuration. I don't use spring.cloud.stream.binders and use only single gcp topic, it works. If I use another gcp/kafka topic, I have to use this property(multi binder configuration) and it doesn't work.

If I use another kafka config to produce using stream bridge, it gives

No qualifying bean of type 'org.springframework.cloud.stream.binder.Binder<?, ?, ?>' available.

There is similar issue reported (multiple gcp binder) https://stackoverflow.com/questions/74926366/spring-cloud-stream-streambridge-with-gcp-pubsub-binder

suztomo commented 1 year ago

If I read correctly, you observed that your program with "spring.cloud.stream.binders" for 2 GCP pubsub topics failed. Would share a minimum reproducible project? (The latest of 3.x is v3.4.7)

suztomo commented 1 year ago

@nilusilu95 If this is still an issue, would you please reopen this ticket with a minimum reproducible project?