spring-attic / spring-cloud-rsocket

This repository is now inactive. Please see https://github.com/rsocket-routing/
https://github.com/rsocket-routing/
Apache License 2.0
49 stars 13 forks source link

io.rsocket.exceptions.RejectedSetupException when Spring Cloud Bus is on Classpath #2

Open spencergibb opened 4 years ago

spencergibb commented 4 years ago

@FWinkler79 said: I am using ... Spring Boot: 2.2.0.RELEASE Spring Cloud: Hoxton.BUILD-SNAPSHOT Sample project: here

The service-gateway uses Spring Cloud Gateway with RSocket support and reservation-service-client is an application that acts as a client to the (RSocket Broker) Gateway. In my production setup, ?reservation-service-client gets its configurations from a Spring Cloud config-server and config updates via Rabbit MQ and Spring Cloud Bus (which is why I need Spring Cloud Bus on the classpath).

Problem is: as soon as Spring Cloud Bus is on the classpath, I receive an Exception at startup of reservation-service-client which looks as follows:

2019-10-21 13:40:09.145  INFO 11104 --- [           main] c.e.r.s.client.ReservationServiceClient  : Started ReservationServiceClient in 2.916 seconds (JVM running for 3.606)
io.rsocket.exceptions.RejectedSetupException: Route Id already registered: 500
    at io.rsocket.exceptions.Exceptions.from(Exceptions.java:61)
    at io.rsocket.RSocketRequester.handleStreamZero(RSocketRequester.java:528)
    at io.rsocket.RSocketRequester.handleIncomingFrames(RSocketRequester.java:514)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
    at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)
    at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:206)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:329)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

This only occurs, if the following dependency is declared:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-bus</artifactId>
</dependency>

I am not sure, this exception is actually a problem but it's certainly not nice.

To reproduce:

  1. start service-gateway
  2. start reservation-service-client
  3. To see the error disappear, comment out spring-cloud-bus dependency in reservation-service-client/pom.xml and start again,
spencergibb commented 4 years ago

@FWinkler79 said: Just to add: I tried a little more, and can now confirm that this really an issue and not just "not nice".

The exception / underlying problem breaks communication between services via the gateway.

In my setup I am using a WebFlux endpoint that translates the POST request into an RSocket request sent via the gateway to another RSocket service. As a result, (with Spring Cloud Bus on the classpath) the exception given above will be thrown.

Here is an additional sample, that shows that setup.

To reproduce:

  1. start service-gateway
  2. start reservation-service <-- throws exception
  3. start reservation-service-client <-- throws exception as well
  4. send a POST request to http://localhost:6666/reservation/create/Spencer with empty body.

To fix reservation-service, comment out the following dependency:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-bus</artifactId>
</dependency>

To fix reservation-service-client, you even need to comment out more:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

So maybe it actually has something to do with the stream binder? Not sure.

Even with "fixed" services, the gateway throws an exception, though. I will open another issue for that. Hope that helps in the analysis.