Closed avramakrishna closed 2 years ago
@avramakrishna This repository is no longer maintained. See the announcement here: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/README.adoc. Closing this issue. Please create a new issue in core Spring Cloud Stream. Thanks!
I see this reference in https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_consuming_batches that starting with version 3.0, we can use batchmode. I am using spring cloud stream 3.2.5. Though I have been able to get this working using @Streamlistener, I did not have success with Functional Style. Here's my pom.xml.
Here's my application.yml:
Here's my code:
when I run, I get the following exception:
org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.23.jar:5.3.23] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:308) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295) ~[spring-boot-2.7.4.jar:2.7.4] at com.example.springcloudfunction.SpringcloudfunctionApplication.main(SpringcloudfunctionApplication.java:10) ~[classes/:na] Caused by: java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set. at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:210) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:151) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:402) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:106) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na] at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.23.jar:5.3.23] ... 14 common frames omitted
But when I use the following code, it starts to consume messages and work just fine
This is the signature for processing as a stream, but as described in https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_consuming_batches, when batch mode is set, it should accept a List instead of KStream. I tried different versions of spring cloud stream, but of no help.