rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

How to keep RSocket connection alive forever. What should be configuration values for keepalive(interval, maxLifeTime) to achieve this? #1097

Open sudhanshucs0102 opened 12 months ago

sudhanshucs0102 commented 12 months ago

I am using RSocket for client server communications, here expecting connection to be alive until closed manually. Its like starting a subscription from client to Server, where client and server can communicate with each other asynchronously forever as needed on same connection.

Client code-

Mono<RSocket> rSocketMono =
                io.rsocket.core.RSocketConnector.create()
                        .setupPayload(ByteBufPayload.create(payloadByte))
                        .acceptor(
                                SocketAcceptor.forFireAndForget(
                                        p -> {
                                            p.release();
                                            return Mono.empty();
                                        })).keepAlive(interval, maxLifeTime)
                        .connect(TcpClientTransport.create(host, port)).block();

Server code-

RSocketServer
                .create(new RSocketClientAcceptor())
                .bindNow(TcpServerTransport.create(port));

Expected Behavior

As per my understanding when Client starts a connection with Server, it starts sending KEEPALIVE frames to server on configured keepalive interval , on the other hand Server can close this connection if it does not receives frame for a duration greater than maxLifeTime. So any configuration for keepalive (interval < maxLifeTime) connection should be alive forever.

Is this the right use-case for RSocket, if No please specify why and any other alternative.

Actual Behavior

Actual behavior is Server closes client Rsocket connection after some time.

Possible Solution

  1. What should be right configuration for keepalive(interval, maxLifeTime) to keep it active forever ?
  2. If it cannot be achieved by configuration, please suggest any other workaround.

Your Environment

It gets closed with below call stacktrace -

2023-09-15 07:26:12.503 [reactor-tcp-epoll-4] at com.citi.gfts.service.server.rsocket.RSocketHandler.dispose(RSocketHandler.java:36) 2023-09-15 07:26:12.503 [reactor-tcp-epoll-4] at io.rsocket.core.RSocketResponder.doOnDispose(RSocketResponder.java:199) 2023-09-15 07:26:12.503 [reactor-tcp-epoll-4] at io.rsocket.core.RSocketResponder.tryTerminate(RSocketResponder.java:117) 2023-09-15 07:26:12.503 [reactor-tcp-epoll-4] at io.rsocket.core.RSocketResponder.tryTerminateOnConnectionClose(RSocketResponder.java:110) 2023-09-15 07:26:12.503 [reactor-tcp-epoll-4] at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135) 2023-09-15 07:26:12.503 [reactor-tcp-epoll-4] at reactor.core.publisher.MonoWhen$WhenCoordinator.signal(MonoWhen.java:213) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at reactor.core.publisher.MonoWhen$WhenInner.onComplete(MonoWhen.java:293) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onComplete(FluxFirstWithSignal.java:352) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:823) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(AbstractEpollChannel.java:480) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:506) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 2023-09-15 07:26:12.504 [reactor-tcp-epoll-4] at java.lang.Thread.run(Thread.java:750)

sudhanshucs0102 commented 11 months ago

@OlegDokuka @mostroverkhov - Please help I am stuck.