rsocket-broker / rsocket-broker-client

Apache License 2.0
33 stars 8 forks source link

Unable to configure nested route via io.rsocket.broker.client.spring.BrokerClientProperties#getAddress() property #22

Open sachins-dev opened 2 years ago

sachins-dev commented 2 years ago

Hi @spencergibb , @OlgaMaciaszek , @OlegDokuka . First of all thanks for the awesome project.

Please help with the below scenario. io.rsocket.broker.client.spring.BrokerRSocketRequester#route() is unable to handle the nested route provided via io.rsocket.broker.client.spring.BrokerClientProperties#getAddress() property in application.yml (e.g. a route with common root level @MessageMapping annotation and different method level @MessageMapping annotation e.g. route of format "simple.rr" where the simple is root level route value of @MessageMapping annotation and rr is the route value of method level @MessageMapping annotation)

Please find the scenario detailed document here. https://docs.google.com/document/d/1uD8STNJbL65sBjiutSt440JRCSY88rRZ/edit

====================== Error logs

2022-04-07 15:05:34.510 ERROR 16127 --- [ctor-http-nio-2] a.w.r.e.AbstractErrorWebExceptionHandler : [9ea98047-1] 500 Server Error for HTTP POST "/api/greet"

java.lang.IllegalArgumentException: message/x.rsocket.broker.frame.v0 metadata was not set for route: simple.rr at io.rsocket.broker.client.spring.BrokerRSocketRequester$BrokerRequestSpec.validateMetadataSet(BrokerRSocketRequester.java:312) ~[rsocket-broker-client-spring-0.3.0.jar:na] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ HTTP POST "/api/greet" [ExceptionHandlingWebHandler] Original Stack Trace: at io.rsocket.broker.client.spring.BrokerRSocketRequester$BrokerRequestSpec.validateMetadataSet(BrokerRSocketRequester.java:312) ~[rsocket-broker-client-spring-0.3.0.jar:na] at io.rsocket.broker.client.spring.BrokerRSocketRequester$BrokerRequestSpec.retrieveMono(BrokerRSocketRequester.java:288) ~[rsocket-broker-client-spring-0.3.0.jar:na] at com.sachin.cg.rsocket.service.RSocketTestService.requestResponse(RSocketTestService.java:26) ~[classes/:na] at com.sachin.cg.rsocket.controller.RSocketTestController.requestResponse(RSocketTestController.java:34) ~[classes/:na] at com.sachin.cg.http.service.TestService.greet(TestService.java:32) ~[classes/:na] at com.sachin.cg.http.controller.TestController.greet(TestController.java:28) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na] at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:144) ~[spring-webflux-5.3.18.jar:5.3.18] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:325) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:120) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2068) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:967) ~[reactor-netty-http-1.0.17.jar:1.0.17] at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677) ~[reactor-netty-core-1.0.17.jar:1.0.17] at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478) ~[reactor-netty-core-1.0.17.jar:1.0.17] at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:570) ~[reactor-netty-http-1.0.17.jar:1.0.17] at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.17.jar:1.0.17] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:222) ~[reactor-netty-http-1.0.17.jar:1.0.17] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.75.Final.jar:4.1.75.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) ~[netty-codec-4.1.75.Final.jar:4.1.75.Final] at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) ~[netty-codec-4.1.75.Final.jar:4.1.75.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[netty-codec-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final] at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

spencergibb commented 2 years ago

Please create a sample project rather than screenshots in a Google doc

sachins-dev commented 2 years ago

Please create a sample project rather than screenshots in a Google doc

Hi @spencergibb, thanks for the response.

Please find the sample code in the below repository https://github.com/sachins-dev/RSocket-Broker-Client

Steps to reproduce:

  1. Compile Protobuf-schema module to get the jar
  2. Put debug point in io.rsocket.broker.client.spring.BrokerRSocketRequester#route()
  3. Run com.sachin.rsocket.broker.RSocketBrokerApplication in debug mode
  4. Run com.sachin.rsocket.server.RSocketServerApplication in debug mode
  5. Run com.sachin.cg.ClientGatewayApplication in debug mode
  6. Trigger the http request from request.http file via inbuilt httpclient of intellij.
  7. See the value of properties.getAddress() in BrokerRSocketRequester#route(), the nested route is not resolved correctly Please check the application.yml once image

Application logs

2022-04-22 14:22:04.201 ERROR 25070 --- [ctor-http-nio-2] a.w.r.e.AbstractErrorWebExceptionHandler : [a3e2af56-1] 500 Server Error for HTTP POST "/api/greet"

java.lang.IllegalArgumentException: message/x.rsocket.broker.frame.v0 metadata was not set for route: simple.rr at io.rsocket.broker.client.spring.BrokerRSocketRequester$BrokerRequestSpec.validateMetadataSet(BrokerRSocketRequester.java:312) ~[rsocket-broker-client-spring-0.3.0.jar:na] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ HTTP POST "/api/greet" [ExceptionHandlingWebHandler] Original Stack Trace: at io.rsocket.broker.client.spring.BrokerRSocketRequester$BrokerRequestSpec.validateMetadataSet(BrokerRSocketRequester.java:312) ~[rsocket-broker-client-spring-0.3.0.jar:na] at io.rsocket.broker.client.spring.BrokerRSocketRequester$BrokerRequestSpec.retrieveMono(BrokerRSocketRequester.java:288) ~[rsocket-broker-client-spring-0.3.0.jar:na] at com.sachin.cg.rsocket.service.RSocketTestService.requestResponse(RSocketTestService.java:24) ~[classes/:na] at com.sachin.cg.rsocket.controller.RSocketTestController.requestResponse(RSocketTestController.java:28) ~[classes/:na] at com.sachin.cg.http.service.TestService.greet(TestService.java:23) ~[classes/:na] at com.sachin.cg.http.controller.TestController.greet(TestController.java:28) ~[classes/:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na] at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na] at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na] at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na] at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:144) ~[spring-webflux-5.3.18.jar:5.3.18] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:325) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:120) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2068) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.4.16.jar:3.4.16] at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:967) ~[reactor-netty-http-1.0.17.jar:1.0.17] at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677) ~[reactor-netty-core-1.0.17.jar:1.0.17] at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478) ~[reactor-netty-core-1.0.17.jar:1.0.17] at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:570) ~[reactor-netty-http-1.0.17.jar:1.0.17] at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.17.jar:1.0.17] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:222) ~[reactor-netty-http-1.0.17.jar:1.0.17] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.75.Final.jar:4.1.75.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) ~[netty-codec-4.1.75.Final.jar:4.1.75.Final] at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) ~[netty-codec-4.1.75.Final.jar:4.1.75.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) ~[netty-codec-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final] at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

sachins-dev commented 2 years ago

Hi @spencergibb , Please check the sample code provided in the above comment.