spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

2 Consumers of same topic #1194

Closed chrismpalmer closed 2 years ago

chrismpalmer commented 2 years ago

Friends,

Since using the latest version of spring cloud and spring-cloud-stream-binder-kafka-streams:3.2.1

I lost the ability to have 2 consumers against the same topic. The spring code actually creates the binder proxy against the topic name. Since another already exists it throws a duplicate bean exception. Another project I am using spring-cloud-stream-binder-kafka-streams:3.0.9.RELEASE and I don't have this problem. Another use posted this issue but then closed it so perhaps its a config thing.

My config is quite basic.

spring:
  cloud:
    stream:
      function:
        definition: func1,func2
        bindings:
          func1-in-0:
            destination: topic1
          func2-in-0:
            destination: topic1

The root issue is that a bean is put as the topic name.

KafkaStreamsBindableProxyFactory.bindInput

            private void bindInput(ResolvableType arg0, String inputName) {
                    if (arg0.getRawClass() != null) {
                        KafkaStreamsBindableProxyFactory.this.inputHolders.put(inputName,
                                new BoundTargetHolder(getBindingTargetFactory(arg0.getRawClass())
                                        .createInput(inputName), true));
                    }
                    BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
                    RootBeanDefinition rootBeanDefinition = new RootBeanDefinition();
                    rootBeanDefinition.setInstanceSupplier(() -> inputHolders.get(inputName).getBoundTarget());
                    registry.registerBeanDefinition(inputName, rootBeanDefinition);
            }

In older versions and my other project kafka streams uses the binding name: func1-in-0 in the code above but the latest version use topic name. The documentation still defines the configuration the same in both versions.

chrismpalmer commented 2 years ago

1037 is the same issue I am facing.

chrismpalmer commented 2 years ago

I found the problem and its quite subtle:

you can bind your processors at both levels: spring.cloud.stream.function.bindings or spring.cloud.stream.bindings

Did I miss something in the docs that describes when you use one or the other?

sobychacko commented 2 years ago

@chrismpalmer The correct property for setting the destination is on spring.cloud.stream.binding.<binding-name>.destination. You use spring.cloud.stream.function.bindings if you need to change the original binding name. The following configuration should work. Note that bindings are under spring.cloud.stream.

spring:
  cloud:
    stream:
      function:
        definition: func1,func2
      bindings:
        func1-in-0:
          destination: topic1
        func2-in-0:
          destination: topic1

If you want to change the original binding names (func1-in-0 or func2-in-0), then you provide the following config.

spring.cloud.stream.function.bindings.func1-in-0: foo
spring.cloud.stream.function.bindings.func2-in-0: bar

When you do this, make sure to use the new binding names for any further configuration - for ex., the destination must be set on those changed binding names, such as spring.cloud.stream.bindings.foo.destination: topic1, so on and so forth.

Stream and binder reference docs have details on these, but if that is not clear, please let us know and we can further clarify the docs.

chrismpalmer commented 2 years ago

Thank you.

chrismpalmer commented 2 years ago

Quick response thanks and I am glad its not a bug.