GoogleCloudPlatform / spring-cloud-gcp

New home for Spring Cloud GCP development starting with version 2.0.
Apache License 2.0
415 stars 307 forks source link

Spring Cloud Stream - Subscribing to More Than One Topic in the Same App #2909

Closed Vyse777 closed 4 months ago

Vyse777 commented 4 months ago

I am completely confused how to utilize Spring Cloud Stream with Google PubSub with multiple topics.

Here is the landscape: I have 1 application that needs to process messages from 3 different topics, each with their own subscribers this app is to use. These topics and subscribers are pre-existing (meaning, created via infrastructure and should not be auto-created by the app) Lastly, these topics are all correlated to the same use-case, so 1 app for processing the 3 topics makes sense. As opposed to 3 applications, one for each topic.

Based on the docs, that I managed to find through insane amounts of research, I understand some of how the bindings work and I would expect I could do something like this in my application.yaml config (maybe I am wrong):

spring:
    cloud:
        stream:
            bindings:
                topicProcessorOneMethodName-in-0:
                    destination: name-of-topic-one-in-gcp
                    group: name-of-subscriber-to-topic-one-in-gcp
                topicProcessorTwoMethodName-in-0:
                    destination: name-of-topic-two-in-gcp
                    group: name-of-subscriber-to-topic-two-in-gcp
                topicProcessorThreeMethodName-in-0:
                    destination: name-of-topic-three-in-gcp
                    group: name-of-subscriber-to-topic-three-in-gcp

For a single topic/subscriber pair (say, just topicProcessorOneMethodName-in-0) this actually works. And I can receive messages to a single method. But for more than one, please read on...

As well, I would expect, the following config to ensure it does not try to create any resources (and try to use the existing ones instead):

spring:
    cloud:
        stream:
          gcp:
            pubsub:
              bindings:
                topicProcessorOneMethodName-in-0:
                  consumer:
                    auto-create-resources: false
                    subscription-name: name-of-subscriber-to-topic-one-in-gcp
                topicProcessorTwoMethodName-in-0:
                  consumer:
                    auto-create-resources: false
                    subscription-name: name-of-subscriber-to-topic-two-in-gcp
                topicProcessorThreeMethodName-in-0:
                  consumer:
                    auto-create-resources: false
                    subscription-name: name-of-subscriber-to-topic-three-in-gcp

I would then expect to be able to create three beans like so:

    @Bean
    fun topicProcessorOneMethodName(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message: Message<String> ->
            // Process the message that came from "name-of-topic-one-in-gcp" - utilizing subscriber "name-of-subscriber-to-topic-one-in-gcp"
        }
    }

    @Bean
    fun topicProcessorTwoMethodName(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message: Message<String> ->
            // Process the message that came from "name-of-topic-two-in-gcp" - utilizing subscriber "name-of-subscriber-to-topic-two-in-gcp"
        }
    }

// And a third similarly named according to the application.yaml config for the 3rd topic & subscriber.

However this does not work. Either because I am misunderstanding how to use this, or I am missing something else.

When starting my application I get the following WARNING (not an error/exception) stating this: Multiple functional beans were found [topicProcessorOneMethodName, topicProcessorTwoMethodName, topicProcessorThreeMethodName], thus can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it.

If I try to apply config like this, which I found researching online how to "use spring.cloud.function.definition properly":

spring:
  cloud:
    function:
      definition: testingMethod,testingMethodTwo,testingMethodThree

I see the message channel being created incorrectly in the startup logs:

INFO   - [main] org.springframework.integration.monitor.IntegrationMBeanExporter:649 : Registering MessageChannel topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0

And anonymous error channels created:

INFO   - [main] org.springframework.cloud.stream.binder.BinderErrorChannel:174 : Channel 'anonymous.topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0.d439f252-1018-46a0-842a-9ace72a1332d.errors' has 1 subscriber(s).
INFO   - [main] org.springframework.cloud.stream.binder.BinderErrorChannel:174 : Channel 'anonymous.topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0.d439f252-1018-46a0-842a-9ace72a1332d.errors' has 0 subscriber(s).
INFO   - [main] org.springframework.cloud.stream.binder.BinderErrorChannel:174 : Channel 'anonymous.topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0.d439f252-1018-46a0-842a-9ace72a1332d.errors' has 1 subscriber(s).
INFO   - [main] org.springframework.cloud.stream.binder.BinderErrorChannel:174 : Channel 'anonymous.topicProcessorOneMethodNametopicProcessorTwoMethodNametopicProcessorThreeMethodName-in-0.d439f252-1018-46a0-842a-9ace72a1332d.errors'  has 2 subscriber(s).

At this point the app does not receive any messages pushed to the applicable topics. Implying it's not attached right.

So it all boils down to: "Am I doing something wrong here?" "Am I misunderstanding how to configure bindings?" "Is this concept supported to begin with (i.e. multiple topics with single subscribers in a single application)?" "All of the above, or none of the above?"

Please help me.

Also, please let me know if there is any additional information I can provide to assist.

meltsufin commented 4 months ago

This is more of a Spring Cloud Stream question than a Spring Cloud GCP question, since we simply provide an implementation of the binder. I believe the list of function definitions should use the ; separator.

spring:
  cloud:
    function:
      definition: testingMethod;testingMethodTwo;testingMethodThree
Vyse777 commented 4 months ago

Thanks for the reply, I honestly had no idea where to go first.

I'll give that separator a try, didn't read anywhere to try that. I've used "," and "|" based on various docs I've read.

Vyse777 commented 4 months ago

Wow so despite the documentation for Spring Cloud Stream (located here: https://cloud.spring.io/spring-cloud-function/reference/html/spring-cloud-function.html#_declarative_function_composition) stating you can use "," or "|" the thing that worked for me was using ";" as you recommended @meltsufin

After doing this, it looks like all of the above config managed to work exactly as I expected it to. ¯\(ツ)

Thanks for that. I guess my issue is fixed...

Vyse777 commented 4 months ago

Closing since this is covered above