spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

How to reuse stream function destination in Spring Stream Kafka Processing #1037

Closed taigetco closed 3 years ago

taigetco commented 3 years ago

I choose Kafka Streams to process some data with Spring Stream Kafka binder. So I define some Function Beans in Spring Stream, like below code,

    @Bean
    public Function<KStream<GenericRecord, GenericRecord>, KStream<String, UserEvent>> process() {
        return input -> input.map((key, value) -> KeyValue.pair(key.get("ID").toString(), generateFrom(value)))
                             .filter((k, v) -> v != null);
    }

for the configuration

    spring:
    cloud:
        stream:
            function:
                definition: userEventProcess;accountEventProcess;xxxProcess;
                bindings:
                    userEventProcess-in-0:
                        destination: xxx.user
                    userEventProcess-out-0:
                        destination: xxx.user_event
                    accountEventProcess-in-0:
                        destination: xxx.user
                    accountEventProcess-in-1:
                        destination: xxx.account
                    accountEventProcess-out-0:
                        destination: xxx.account_event
                    xxxProcess-in-0:
                        destination: xxx.user_event
                    xxxProcess-in-1:
                        destination: xxx.account_event
                    xxxProcess-out-0:
                        destination: xxx.final_result

But get the issues that: The bean 'xxx.user_event' could not be registered. A bean with that name has already been defined and overriding is disabled.

I have read some code from spring-cloud-stream-binder-kafka, found that in KafkaStreamsBindableProxyFactory, the method bindInput will register destination name as one bean, so this means we can not reuse the same topic for another processor? Below is the code from KafkaStreamsBindableProxyFactory.bindInput

        private void bindInput(ResolvableType arg0, String inputName) {
        if (arg0.getRawClass() != null) {
            KafkaStreamsBindableProxyFactory.this.inputHolders.put(inputName,
                    new BoundTargetHolder(getBindingTargetFactory(arg0.getRawClass())
                            .createInput(inputName), true));
        }

        BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;

        RootBeanDefinition rootBeanDefinition = new RootBeanDefinition();
        rootBeanDefinition.setInstanceSupplier(() -> inputHolders.get(inputName).getBoundTarget());
        registry.registerBeanDefinition(inputName, rootBeanDefinition);

    }
chrismpalmer commented 2 years ago

@taigetco Any reason why you closed this? I am having this same problem.