spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
988 stars 602 forks source link

Auto register functionals beans as spring cloud stream bindings #2170

Open ferblaca opened 3 years ago

ferblaca commented 3 years ago

When configuring an application with spring cloud stream, if there is a bean that implements a functional interface even if its purpose is neither to publish nor to subscribe to events, spring cloud stream detects it and tries to create the provisioning of the target, and even if the bean is a supplier, it even sends events.

This happens as long as there is no more than one bean in the spring context of type Function, Consumer or Supplier, even if you set the property "spring.cloud.function.scan.enabled" to false.

This can be a problem in applications where you only want to use StreamBridge to produce events and for other reasons there is a bean that implements a functional interface in the spring context.

Is this the desired behaviour?

To Reproduce Attached is an application that reproduces the problem. demoStreamKafka.zip

Version of the framework spring-cloud 2020.0.2 spring-cloud-stream 3.1.2

olegz commented 3 years ago

The behavior is intentional but will only happen if you have single bean of type Function, Consumer or Supplier. If you have multiple, non will be bound without providing spring.cloud.function.definition property.

You can disable it by using spring.cloud.stream.function.autodetect property - https://docs.spring.io/spring-cloud-stream/docs/3.0.12.RELEASE/reference/html/spring-cloud-stream.html#_overview

For future reference please use our dedicated Stack Overflow channel for questions

ferblaca commented 3 years ago

Thank you very much for the information @olegz !

Yes, the behaviour is as you describe, but I'm afraid that the property "spring.cloud.stream.function.autodetect" has no effect in the latest version 3.1.2... but it does in 3.0.x.

Will it be available for version 3.1.3?

olegz commented 3 years ago

I'll investigate. I am surprised that it has no effect. But yes it will be available in 3.1.3

olegz commented 3 years ago

As a workaround you can always use spring.cloud.function.definition=blah where blah points to something that doesn't exist. You'll see a warning message, but that's ok.

olegz commented 3 years ago

Actually this indeed has been addressed in 3.1, but have not been released yet - https://github.com/spring-cloud/spring-cloud-stream/issues/2035

ferblaca commented 3 years ago

Hello @olegz ! just to inform, now in version 3.1.2 this workaroung is not valid: Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'supplierInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'blah' available at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean (AbstractAutowireCapableBeanFactory.java:1786) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean (AbstractAutowireCapableBeanFactory.java:602) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean (AbstractAutowireCapableBeanFactory.java:524)

To solve this for the moment I will define more than one dummy functional bean by leaving the 'spring.cloud.function.definition' property empty.

LeovR commented 3 years ago

We are running into the same issue as ferblaca.

There exists a bean in the context of our application which implements a functional interface that has nothing to do with spring cloud stream. And we have a definition of spring.cloud.stream.source for the StreamBridge.

We also have spring.cloud.stream.function.autodetect=false.

The issue is within the SimpleFunctionRegistry#normalizeFunctionDefinition which replaces the functionDefinition which comes from spring.cloud.stream.source with the name of the single non-related bean.

Thus, our binding configuration is not applied.

As a workaround we define another non-related bean which implements a functional interface.

@olegz can we maybe reopen this issue?

olegz commented 3 years ago

Sure, we can reopen if one of you provide us with sample to reproduce it. A simple project with bare minimum to reproduce the issue somewhere in github will do.

LeovR commented 3 years ago

Alright, I will provide a sample application

LeovR commented 3 years ago

I created a sample project here: https://github.com/LeovR/spring-cloud-stream-issue-2170

olegz commented 3 years ago

@LeovR why is there spring.cloud.stream.function.autodetect=false?

LeovR commented 3 years ago

Because the arbitraryFunctionalBean in the demo project is just an example of a bean implementing a functional interface. The only source for publishing messages is the StreamBridge.

In our application there is a bean of org.springframework.cloud.kubernetes.fabric8.discovery.KubernetesClientServicesFunction which extends Function<KubernetesClient, FilterWatchListDeletable<Service, ServiceList, Boolean, Watch>>.

We do not want this as a source for spring cloud stream. Thus, we disabled spring.cloud.stream.function.autodetect.

olegz commented 3 years ago

So you just need StreamBridge, correct?

LeovR commented 3 years ago

Yes, correct.

olegz commented 3 years ago

Well, perhaps we need to update the docs, but you really don't need the following

spring.cloud.stream.source=sourceSupplier
spring.cloud.stream.bindings.sourceSupplier-out-0.destination=sourcetopic

Basically inject StreamBridge whenever and use it. For example

streamBridge.send("sourcetopic", "blah, blah");

or if you want your binding name to be different then destination name you can still use spring.cloud.stream.bindings.some_binding_name.destination=sourcetopic For example

spring.cloud.stream.bindings.sourceSupplier-out-0.destination=sourcetopic
. . .
streamBridge.send("sourceSupplier-out-0.", "blah, blah");

Basically the way StreamBridge works is it will send to the existing binding or will auto-provision one on the first send.

Give it a try and let me know. I'll keep it open until you confirm

LeovR commented 3 years ago

In the docs there is the following chapter:

https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

Spring Cloud Stream provides two mechanisms, so let’s look at them in more details

Basically you are suggesting to use this mode: https://docs.spring.io/spring-cloud-stream/docs/3.1.3/reference/html/spring-cloud-stream.html#_streambridge_and_dynamic_destinations

I think that would work for us but there is a small difference in those mechanisms. With the explicit definition of spring.cloud.stream.source the binding will be created at context creation.

Thus, we know at the start whether the configuration is correct. Wherease in the dynamic mode the binding will only be created after using the StreamBridge for the first time.

So if the first mode is still a valid option then I think there is still an issue as demonstrated in the demo project.

davidmelia commented 2 years ago

@olegz I have a GitHub project as I have suffered from the same issues above - see https://github.com/davidmelia/spring-cloud-function-source-only. This project is a simple spring cloud function and spring cloud stream kafka producer and therefore does not need a 'definition'.

You can run AwsLambdaRequestHandlerSandpit which will create a message in the kafka topic 'test'.

1) spring.cloud.stream.source does not bootstrap at context creation but I think it should (similar to spring.cloud.function.definition) - its seems to be compleltey redundant

2) Because I only have a source (and not a spring.cloud.function.definition) I need spring.cloud.stream.function.autodetect=false. Is there anything clever that can be done to say if we only have a spring.cloud.stream.source then autodetect=false rather than having to specify it?

Thanks

Drizzthsz commented 2 years ago

Hi!

I have found a related issue. I have a very simple project to reproduce it: demo.zip

It seems to be an issue when I try to use the coherence-spring-boot-starter and coherence together with spring-cloud-stream. The Application cannot start correctly. The stack trace is: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'getExtractor' defined in class path resource [com/oracle/coherence/spring/configuration/ExtractorConfiguration.class]: Unexpected exception during bean creation; nested exception is java.lang.IllegalStateException: No current InjectionPoint available for method 'getExtractor' parameter 0 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1755) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:604) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:526) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:928) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:926) ~[spring-context-6.0.0-M4.jar:6.0.0-M4] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:592) ~[spring-context-6.0.0-M4.jar:6.0.0-M4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:740) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:436) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at org.springframework.boot.SpringApplication.run(SpringApplication.java:309) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-3.0.0-M3.jar:3.0.0-M3] at com.example.demo.DemoApplication.main(DemoApplication.java:10) ~[classes/:na] Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'getExtractor' defined in class path resource [com/oracle/coherence/spring/configuration/ExtractorConfiguration.class]: Unexpected exception during bean creation; nested exception is java.lang.IllegalStateException: No current InjectionPoint available for method 'getExtractor' parameter 0 at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:539) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:344) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1141) ~[spring-context-6.0.0-M4.jar:6.0.0-M4] at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.discoverFunctionInBeanFactory(BeanFactoryAwareFunctionRegistry.java:206) ~[spring-cloud-function-context-4.0.0-M3.jar:4.0.0-M3] at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.lookup(BeanFactoryAwareFunctionRegistry.java:129) ~[spring-cloud-function-context-4.0.0-M3.jar:4.0.0-M3] at org.springframework.cloud.function.context.FunctionCatalog.lookup(FunctionCatalog.java:42) ~[spring-cloud-function-context-4.0.0-M3.jar:4.0.0-M3] at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionBindingRegistrar.determineFunctionName(FunctionConfiguration.java:989) ~[spring-cloud-stream-4.0.0-M3.jar:4.0.0-M3] at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionBindingRegistrar.afterPropertiesSet(FunctionConfiguration.java:861) ~[spring-cloud-stream-4.0.0-M3.jar:4.0.0-M3] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1802) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1751) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] ... 15 common frames omitted Caused by: java.lang.IllegalStateException: No current InjectionPoint available for method 'getExtractor' parameter 0 at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:858) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:767) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:525) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1324) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1161) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:566) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:526) ~[spring-beans-6.0.0-M4.jar:6.0.0-M4] ... 25 common frames omitted As you can see there is IllegalStateException. This happens because the Spring cloud stream finds a bean factory method that would create a bean of a Functional interface. But I think that this particular Functional interface should not be picked up. The reason is that this is a Prototype bean that can only be created when there is a provided injection point. What I doubt is: is it good idea to pick up a bean factory method that is prototype and takes injection point as an input parameter? What causes the Exception is @Bean @Primary @Scope(BeanDefinition.SCOPE_PROTOTYPE) public ValueExtractor<?, ?> getExtractor(InjectionPoint injectionPoint) { return this.extractorService.getExtractor(injectionPoint); } in com.oracle.coherence.spring.configuration.ExtractorConfiguration. But I believe the root cause would be to not try to create Beans where InjectionPoint parameter is needed. Please share your thoughts on this.

Thanks!