rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.36k stars 354 forks source link

Server dies on keep-alive ack timeout #640

Closed lksvenoy-r7 closed 5 years ago

lksvenoy-r7 commented 5 years ago

The rsocket server dies on keep-alive ack timeout. I've tried adding onErrorResume, but to no avail. How can I prevent the server from closing its socket on error?

Error

[2019-05-20 11:12:21.438] ERROR [parallel-1] RegistryRSocketServer: Error occurred during session
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)20 May 2019  at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

Version: 0.12.2-RC2

Server Code

server = RSocketFactory
        .receive()
        .frameDecoder(ZERO_COPY)
        .addConnectionPlugin(micrometerDuplexConnectionInterceptor)
        .errorConsumer(e -> log.error("Error occurred during session", e))
        .acceptor(socketAcceptor)
        .transport(serverTransport)
        .start()
        .onErrorResume(e -> Mono.empty())
        .subscribe();

Acceptor
@Override
  public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
    return Mono.just(new RegistryRSocket(scheduler));
  }

requestStream
return Mono.just(payload)
        .map(this::getRequestFromPayload)
        .flux()
       .map(/*..Does Something..*/)
        .onErrorResume(throwable ->
            Flux.just(createPayloadFromThrowable(throwable)));

private Payload createPayloadFromThrowable(Throwable t) {
    return ByteBufPayload.create(ErrorFrameFlyweight.encode(DEFAULT, 0, t));
  }

Any help would be greatly appreciated

robertroeser commented 5 years ago

@mostroverkhov can you take a look at this?

mostroverkhov commented 5 years ago

Our internal testing showed RSocketServer may become unresponsive in certain situations - this should be fixed in upcoming 0.12.2-RC3 release, It contains sets of improvements on both RSocketServer and keep-alives. You can try latest snapshot 0.12.2-RC3-SNAPSHOT from https://oss.jfrog.org/oss-snapshot-local

lksvenoy-r7 commented 5 years ago

@mostroverkhov Is this related to the fact that terminate, called on keep-alive ack timeout, closes the underlying server connection? Just curious https://github.com/rsocket/rsocket-java/blob/71915d4f0c16f1107b8b09f9555eff4bfe07e499/rsocket-core/src/main/java/io/rsocket/RSocketServer.java

lksvenoy-r7 commented 5 years ago

@mostroverkhov I'll give 0.12.2-RC3 a shot! Thank you, and thank you all for this amazing project :)

mostroverkhov commented 5 years ago

@lksvenoy-r7 keep-alives are not optional, and connection must be closed if KEEP-ALIVE frames were not seen for a long time. On 0.12.2-RC2, you can try set high value for keep-alive timeout when creating ClientRSocketFactory - but this would not be helpful if RSocketServer becomes unresponsive

lksvenoy-r7 commented 5 years ago

@mostroverkhov Am aware of this. This issue happened when putting the server under significant load. However, would it not make more sense for the server to drop client connections, rather than shut down the server connection?

In my case, the service renders a state where the only way to recover is to restart the box, and this is the issue I am trying to prevent.

I understand keep alives are optional, but should the server really be killed when they do not arrive? In that case, how can the service recover gracefully on error?

robertroeser commented 5 years ago

@mostroverkhov Weren't changes have been made in develop to how keep-alives are handled vs 0.12.2-RC3? I seem remember running into something similar.

mostroverkhov commented 5 years ago

@robertroeser Yes, that changed after 0.12.2-RC2 (current develop) - that's what I meant by It contains sets of improvements on both RSocketServer and keep-alives

lksvenoy-r7 commented 5 years ago

@robertroeser @mostroverkhov I may be misunderstanding this code then, but looking at https://github.com/rsocket/rsocket-java/blob/71915d4f0c16f1107b8b09f9555eff4bfe07e499/rsocket-core/src/main/java/io/rsocket/RSocketServer.java on line 130, it does appear, to me atleast, that the underlying server connection is disposed on keep-alive ack timeout, which for me seems like strange behaviour, as your service would now need to either have inbuilt resilience/reboot, or you'd need to restart the box. Why would the server close its underlying connection and die, instead of closing its client(s)?

mostroverkhov commented 5 years ago

@lksvenoy-r7 The RSocketClient and RSocketServer are actually requester and responder on one side of connection - naming here is confusing. Closing connection leads to closing requester and responder on connection, but not server started with ServerRSocketFactory.

robertroeser commented 5 years ago

@mostroverkhov I think there was a ticket somewhere to rename RSocketClient to RSocketRequester and RSocketServer to RSocketResponder - they are only used internally you should just go ahead and update them.

lksvenoy-r7 commented 5 years ago

@mostroverkhov @robertroeser I've tested with 0.12.2-RC3-SNAPSHOT and am now experiencing different problems. While the socket didn't die instantly, the EC2 instance the service was running on has frozen.

58095631-5d92f380-7bcb-11e9-874f-74f8e74a19a9

I ran a significant number of requests against the machine, and saw the memory usage reach 100%. After 20 minutes, the memory usage still has not decreased although requests have stopped. (Above screenshot)

After 1 hour (now) the machine is locked up, it is not possible to SSH onto it and it is unresponsive.

Server Logs

at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
 at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
 at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
[2019-05-21 11:28:15.692] ERROR [parallel-1] RegistryRSocketServer: Error occurred during session
[2019-05-21 11:20:19.006] ERROR [reactor-http-server-epoll-8] HttpWebHandlerAdapter: [9bfa21a9] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:19.005] ERROR [reactor-http-server-epoll-8] HttpWebHandlerAdapter: [a64dd79a] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
 [2019-05-21 11:20:19.004] ERROR [reactor-http-server-epoll-8] HttpWebHandlerAdapter: [6c798287] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:19.000] ERROR [reactor-http-server-epoll-8] HttpWebHandlerAdapter: [38e308ec] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.995] ERROR [reactor-http-server-epoll-6] HttpWebHandlerAdapter: [4ceb3401] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.991] ERROR [reactor-http-server-epoll-5] HttpWebHandlerAdapter: [9cd3b11c] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.990] ERROR [reactor-http-server-epoll-5] HttpWebHandlerAdapter: [bfc834d1] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.990] ERROR [reactor-http-server-epoll-6] HttpWebHandlerAdapter: [de4fac7f] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.990] ERROR [reactor-http-server-epoll-5] HttpWebHandlerAdapter: [8e023c1f] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.989] ERROR [reactor-http-server-epoll-7] HttpWebHandlerAdapter: [8d50ea8f] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.961] ERROR [reactor-http-server-epoll-7] HttpWebHandlerAdapter: [54ea93fb] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.962] ERROR [reactor-http-server-epoll-6] HttpWebHandlerAdapter: [02565b55] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
[2019-05-21 11:20:18.962] ERROR [reactor-http-server-epoll-5] HttpWebHandlerAdapter: [ce8c8a9b] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
 [2019-05-21 11:20:18.960] ERROR [reactor-http-server-epoll-8] HttpWebHandlerAdapter: [908c808e] Error [java.nio.channels.ClosedChannelException] for HTTP GET "/monitoring/health", but ServerHttpResponse already committed (200 OK)
 at java.base/java.lang.Thread.run(Thread.java:834)
 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
[2019-05-21 11:20:18.316] ERROR [parallel-2] RegistryRSocketServer: Error occurred during session
at java.base/java.lang.Thread.run(Thread.java:834)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
[2019-05-21 11:20:18.249] ERROR [parallel-2] RegistryRSocketServer: Error occurred during session
at java.base/java.lang.Thread.run(Thread.java:834)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
 at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
 at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)
 io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
[2019-05-21 11:20:18.093] ERROR [parallel-1] RegistryRSocketServer: Error occurred during session
at java.base/java.lang.Thread.run(Thread.java:834)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
 at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
ERROR [parallel-2] RegistryRSocketServer: Error occurred during session
mostroverkhov commented 5 years ago

@lksvenoy-r7 looks like code is still run with 0.12.2-RC2, according to stacktraces ( io.rsocket.keepalive.KeepAliveConnection was removed recently). Can you check project dependency graph / lockfiles if It actually built with 0.12.2-RC3-SNAPSHOT ?

lksvenoy-r7 commented 5 years ago

@mostroverkhov Yes I think you're right, sorry about that. I'll get back to you once I've resolved it.

lksvenoy-r7 commented 5 years ago

@mostroverkhov Ok so I managed to successfully put 0.12.2-RC3-SNAPSHOT on the instance. With this version, it doesn't work at all. I get no results back, and the connection is quickly closed. There are also no informative error logs. When hitting the socket, CPU usage initially increases (indicating it is being hit) but then the CPU usage drops to defaults and the service is now no longer possible to hit as it doesn't seem to be accepting data anymore.

The only error I can see on the client side (which isn't very informative) is Internal server error Exception while fetching data (/endpoint/socket) : Connection has been closed

lksvenoy-r7 commented 5 years ago

@mostroverkhov Here's rsocket-cli debug output when trying to hit the service

14:44:40.138    sending -> Frame => Stream ID: 0 Type: KEEPALIVE Respond flag: true Payload:
14:45:00.124    sending -> Frame => Stream ID: 0 Type: KEEPALIVE Respond flag: true Payload:
14:45:20.123    sending -> Frame => Stream ID: 0 Type: KEEPALIVE Respond flag: true Payload:
14:45:40.122    sending -> Frame => Stream ID: 0 Type: KEEPALIVE Respond flag: true Payload:
client error
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 90000 ms
        at io.rsocket.RSocketClient.lambda$new$1(RSocketClient.java:98)
        at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
        at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
        at io.rsocket.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:65)
        at io.rsocket.KeepAliveHandler$Client.onIntervalTick(KeepAliveHandler.java:89)
        at io.rsocket.KeepAliveHandler.lambda$new$0(KeepAliveHandler.java:31)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
        at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
        at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
        at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)                                                                                                        
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
14:46:00.140    sending -> Frame => Stream ID: 0 Type: KEEPALIVE Respond flag: true Payload:
error from server
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 90000 ms
        at io.rsocket.RSocketClient.lambda$new$1(RSocketClient.java:98)
        at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
        at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
        at io.rsocket.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:65)
        at io.rsocket.KeepAliveHandler$Client.onIntervalTick(KeepAliveHandler.java:89)
        at io.rsocket.KeepAliveHandler.lambda$new$0(KeepAliveHandler.java:31)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
        at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
        at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
        at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)                                                                                                        
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

There is a connection but the service is not responding. Killing the socket manually with systemctl produces (rsocket-cli)

14:48:47.426    sending -> Frame => Stream ID: 0 Type: KEEPALIVE Respond flag: true Payload:
error from server                                                         
java.nio.channels.ClosedChannelException
        at io.rsocket.RSocketClient.terminate(RSocketClient.java:385)
        at io.rsocket.RSocketClient.lambda$new$0(RSocketClient.java:78)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:139)
        at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:301)                                                                                                                                                       at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)                                                                                                                                                
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)                                                                                                                                               
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)                                                                                                                                             
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)                                                                                                                                                
        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103)                                                                                                                                                     
        at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)                                                                                                                                                         at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1148)                                                                                                                                               
        at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:764)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:740)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:611)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:628)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:528)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:482)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
        at java.base/java.lang.Thread.run(Thread.java:834)
lksvenoy-r7 commented 5 years ago

@mostroverkhov I've replicated this issue a couple of times now, and did a packet capture to look at the issue as well. It turns out that, looking at the tcpdump, the socket connection does not go further than switching protocols.

It looks something like this

GET / HTTP/1.1
request body

HTTP/1.1 101 Switching Protocols
upgrade: websocket
connection: upgrade
mostroverkhov commented 5 years ago

@lksvenoy-r7 We need more info about your setup - RSocket-java currently on develop was verified with rather comprehensive soak test, so closed connection is unexpected. What version of reactor-netty/bom is used? version of netty-tcnative if present. Ideally dependency graph/ lockfiles would help ultimately resolve the issue.
rsocket-cli is built with some earlier version, and while It usually supposed to work, It is better to have java only reproducer

lksvenoy-r7 commented 5 years ago

@mostroverkhov No worries, here's a dependency graph. If you need anything else let me know.

[INFO] --- maven-dependency-plugin:3.1.1:tree (default-cli) @ PROPRIETARY ---
[INFO] PROPRIETARY
[INFO] +- org.slf4j:slf4j-api:jar:1.7.25:compile
[INFO] +- org.projectlombok:lombok:jar:1.18.4:provided
[INFO] +- org.springframework.boot:spring-boot-starter-webflux:jar:2.1.0.RELEASE:compile
[INFO] |  +- org.springframework.boot:spring-boot-starter:jar:2.1.0.RELEASE:compile
[INFO] |  |  +- org.springframework.boot:spring-boot:jar:2.1.0.RELEASE:compile
[INFO] |  |  +- org.springframework.boot:spring-boot-autoconfigure:jar:2.1.0.RELEASE:compile
[INFO] |  |  +- javax.annotation:javax.annotation-api:jar:1.3.2:compile
[INFO] |  |  \- org.yaml:snakeyaml:jar:1.23:runtime
[INFO] |  +- org.springframework.boot:spring-boot-starter-json:jar:2.1.0.RELEASE:compile
[INFO] |  |  +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.8:compile
[INFO] |  |  +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.8:compile
[INFO] |  |  \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.7:compile
[INFO] |  +- org.springframework.boot:spring-boot-starter-reactor-netty:jar:2.1.0.RELEASE:compile
[INFO] |  |  \- io.projectreactor.netty:reactor-netty:jar:0.8.2.RELEASE:compile
[INFO] |  |     \- io.netty:netty-handler-proxy:jar:4.1.29.Final:compile
[INFO] |  |        \- io.netty:netty-codec-socks:jar:4.1.29.Final:compile
[INFO] |  +- org.hibernate.validator:hibernate-validator:jar:6.0.13.Final:compile
[INFO] |  |  +- javax.validation:validation-api:jar:2.0.1.Final:compile
[INFO] |  |  +- org.jboss.logging:jboss-logging:jar:3.3.2.Final:compile
[INFO] |  |  \- com.fasterxml:classmate:jar:1.4.0:compile
[INFO] |  +- org.springframework:spring-web:jar:5.1.2.RELEASE:compile
[INFO] |  |  \- org.springframework:spring-beans:jar:5.1.2.RELEASE:compile
[INFO] |  +- org.springframework:spring-webflux:jar:5.1.2.RELEASE:compile
[INFO] |  |  \- io.projectreactor:reactor-core:jar:3.2.2.RELEASE:compile
[INFO] |  \- org.synchronoss.cloud:nio-multipart-parser:jar:1.1.0:compile
[INFO] |     \- org.synchronoss.cloud:nio-stream-storage:jar:1.1.3:compile
[INFO] +- org.springframework.boot:spring-boot-configuration-processor:jar:2.1.0.RELEASE:compile
[INFO] +- org.springframework.boot:spring-boot-starter-logging:jar:2.1.0.RELEASE:compile
[INFO] |  +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.11.1:compile
[INFO] |  |  \- org.apache.logging.log4j:log4j-api:jar:2.11.1:compile
[INFO] |  \- org.slf4j:jul-to-slf4j:jar:1.7.25:compile
[INFO] +- org.springframework.boot:spring-boot-starter-actuator:jar:2.1.0.RELEASE:compile
[INFO] |  +- org.springframework.boot:spring-boot-actuator-autoconfigure:jar:2.1.0.RELEASE:compile
[INFO] |  |  \- org.springframework.boot:spring-boot-actuator:jar:2.1.0.RELEASE:compile
[INFO] |  \- io.micrometer:micrometer-core:jar:1.1.0:compile
[INFO] |     +- org.hdrhistogram:HdrHistogram:jar:2.1.9:compile
[INFO] |     \- org.latencyutils:LatencyUtils:jar:2.0.3:compile
[INFO] +- org.springframework.cloud:spring-cloud-context:jar:2.1.0.RELEASE:compile
[INFO] |  \- org.springframework.security:spring-security-crypto:jar:5.1.1.RELEASE:compile
[INFO] +- org.springframework.cloud:spring-cloud-commons:jar:2.1.0.RELEASE:compile
[INFO] +- org.springframework.cloud:spring-cloud-aws-context:jar:2.1.0.RELEASE:compile
[INFO] |  +- org.springframework.cloud:spring-cloud-aws-core:jar:2.1.0.RELEASE:compile
[INFO] |  |  +- org.springframework:spring-aop:jar:5.1.2.RELEASE:compile
[INFO] |  |  +- com.amazonaws:aws-java-sdk-core:jar:1.11.415:compile
[INFO] |  |  |  +- software.amazon.ion:ion-java:jar:1.2.0:compile
[INFO] |  |  |  +- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.9.8:compile
[INFO] |  |  |  \- joda-time:joda-time:jar:2.10.1:compile
[INFO] |  |  +- com.amazonaws:aws-java-sdk-s3:jar:1.11.415:compile
[INFO] |  |  |  +- com.amazonaws:aws-java-sdk-kms:jar:1.11.415:compile
[INFO] |  |  |  \- com.amazonaws:jmespath-java:jar:1.11.415:compile
[INFO] |  |  +- com.amazonaws:aws-java-sdk-ec2:jar:1.11.415:compile
[INFO] |  |  \- com.amazonaws:aws-java-sdk-cloudformation:jar:1.11.415:compile
[INFO] |  +- org.springframework:spring-context:jar:5.1.2.RELEASE:compile
[INFO] |  |  \- org.springframework:spring-expression:jar:5.1.2.RELEASE:compile
[INFO] |  \- javax.activation:javax.activation-api:jar:1.2.0:compile
[INFO] +- org.aspectj:aspectjweaver:jar:1.9.4:compile
[INFO] +- javax.websocket:javax.websocket-api:jar:1.1:compile
[INFO] +- software.amazon.awssdk:netty-nio-client:jar:2.4.2:compile
[INFO] |  +- software.amazon.awssdk:annotations:jar:2.4.2:compile
[INFO] |  +- software.amazon.awssdk:http-client-spi:jar:2.4.2:compile
[INFO] |  +- software.amazon.awssdk:utils:jar:2.4.2:compile
[INFO] |  +- io.netty:netty-codec-http:jar:4.1.32.Final:compile
[INFO] |  +- io.netty:netty-codec-http2:jar:4.1.32.Final:compile
[INFO] |  +- io.netty:netty-codec:jar:4.1.32.Final:compile
[INFO] |  +- io.netty:netty-transport:jar:4.1.32.Final:compile
[INFO] |  |  \- io.netty:netty-resolver:jar:4.1.29.Final:compile
[INFO] |  +- io.netty:netty-common:jar:4.1.32.Final:compile
[INFO] |  +- io.netty:netty-buffer:jar:4.1.32.Final:compile
[INFO] |  +- io.netty:netty-handler:jar:4.1.32.Final:compile
[INFO] |  +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.29.Final:compile
[INFO] |  |  \- io.netty:netty-transport-native-unix-common:jar:4.1.29.Final:compile
[INFO] |  +- com.typesafe.netty:netty-reactive-streams-http:jar:2.0.0:compile
[INFO] |  |  \- com.typesafe.netty:netty-reactive-streams:jar:2.0.0:compile
[INFO] |  \- org.reactivestreams:reactive-streams:jar:1.0.2:compile
[INFO] +- org.codehaus.groovy:groovy-all:jar:2.4.15:compile
[INFO] +- ch.qos.logback:logback-classic:jar:1.2.3:compile
[INFO] |  \- ch.qos.logback:logback-core:jar:1.2.3:compile
[INFO] +- io.micrometer:micrometer-registry-statsd:jar:1.1.2:compile
[INFO] +- com.google.protobuf:protobuf-java:jar:3.6.1:compile
[INFO] +- com.fasterxml.jackson.module:jackson-module-afterburner:jar:2.9.8:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.9.8:compile
[INFO] |  \- com.fasterxml.jackson.core:jackson-databind:jar:2.9.8:compile
[INFO] |     \- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile
[INFO] +- com.google.protobuf:protoc:pom:3.6.1:compile
[INFO] +- io.rsocket:rsocket-core:jar:0.12.2-RC3-SNAPSHOT:compile
[INFO] +- io.rsocket:rsocket-transport-netty:jar:0.12.2-RC3-SNAPSHOT:compile
[INFO] +- io.rsocket:rsocket-micrometer:jar:0.12.2-RC3-SNAPSHOT:compile
[INFO] +- software.amazon.awssdk:lambda:jar:2.4.2:compile
[INFO] |  +- software.amazon.awssdk:aws-json-protocol:jar:2.4.2:compile
[INFO] |  +- software.amazon.awssdk:protocol-core:jar:2.4.2:compile
[INFO] |  +- software.amazon.awssdk:sdk-core:jar:2.4.2:compile
[INFO] |  |  \- software.amazon.awssdk:profiles:jar:2.4.2:compile
[INFO] |  +- software.amazon.awssdk:auth:jar:2.4.2:compile
[INFO] |  |  \- software.amazon:flow:jar:1.7:compile
[INFO] |  +- software.amazon.awssdk:regions:jar:2.4.2:compile
[INFO] |  +- software.amazon.awssdk:aws-core:jar:2.4.2:compile
[INFO] |  \- software.amazon.awssdk:apache-client:jar:2.4.2:runtime
[INFO] |     +- org.apache.httpcomponents:httpclient:jar:4.5.6:compile
[INFO] |     \- org.apache.httpcomponents:httpcore:jar:4.4.10:compile
[INFO] +- PROPRIETARY
[INFO] |  +- PROPRIETARY
[INFO] |  |  +- PROPRIETARY
[INFO] |  |  +- PROPRIETARY
[INFO] |  |  +- com.github.ben-manes.caffeine:caffeine:jar:2.6.2:compile
[INFO] |  |  +- com.github.ben-manes.caffeine:guava:jar:2.6.2:compile
[INFO] |  |  |  \- com.google.guava:guava:jar:26.0-jre:test
[INFO] |  |  |     +- com.google.code.findbugs:jsr305:jar:3.0.2:test
[INFO] |  |  |     +- org.checkerframework:checker-qual:jar:2.5.2:test
[INFO] |  |  |     +- com.google.errorprone:error_prone_annotations:jar:2.1.3:test
[INFO] |  |  |     +- com.google.j2objc:j2objc-annotations:jar:1.1:test
[INFO] |  |  |     \- org.codehaus.mojo:animal-sniffer-annotations:jar:1.14:test
[INFO] |  |  +- commons-codec:commons-codec:jar:1.11:compile
[INFO] |  |  \- org.agrona:Agrona:jar:0.9.1:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.8.1:compile
[INFO] |  +- commons-io:commons-io:jar:2.5:compile
[INFO] |  \- org.jetbrains.kotlin:kotlin-stdlib:jar:1.2.71:compile
[INFO] |     +- org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.2.71:compile
[INFO] |     \- org.jetbrains:annotations:jar:13.0:compile
[INFO] +- org.springframework.boot:spring-boot-starter-test:jar:2.1.0.RELEASE:test
[INFO] |  +- org.springframework.boot:spring-boot-test:jar:2.1.0.RELEASE:test
[INFO] |  +- org.springframework.boot:spring-boot-test-autoconfigure:jar:2.1.0.RELEASE:test
[INFO] |  +- com.jayway.jsonpath:json-path:jar:2.4.0:test
[INFO] |  |  \- net.minidev:json-smart:jar:2.3:test
[INFO] |  |     \- net.minidev:accessors-smart:jar:1.2:test
[INFO] |  |        \- org.ow2.asm:asm:jar:5.0.4:test
[INFO] |  +- junit:junit:jar:4.12:test
[INFO] |  +- org.assertj:assertj-core:jar:3.11.1:test
[INFO] |  +- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] |  +- org.hamcrest:hamcrest-library:jar:1.3:test
[INFO] |  +- org.skyscreamer:jsonassert:jar:1.5.0:test
[INFO] |  |  \- com.vaadin.external.google:android-json:jar:0.0.20131108.vaadin1:test
[INFO] |  +- org.springframework:spring-core:jar:5.1.2.RELEASE:compile
[INFO] |  |  \- org.springframework:spring-jcl:jar:5.1.2.RELEASE:compile
[INFO] |  +- org.springframework:spring-test:jar:5.1.2.RELEASE:test
[INFO] |  \- org.xmlunit:xmlunit-core:jar:2.6.2:test
[INFO] |     \- javax.xml.bind:jaxb-api:jar:2.3.1:test
[INFO] +- org.mockito:mockito-core:jar:2.23.4:test
[INFO] |  +- net.bytebuddy:byte-buddy:jar:1.9.3:test
[INFO] |  +- net.bytebuddy:byte-buddy-agent:jar:1.9.3:test
[INFO] |  \- org.objenesis:objenesis:jar:2.6:test
[INFO] +- com.github.stefanbirkner:system-rules:jar:1.19.0:test
[INFO] +- io.rsocket:rsocket-transport-local:jar:0.12.2-RC3-SNAPSHOT:test
[INFO] \- org.testcontainers:localstack:jar:1.10.6:test
[INFO]    \- org.testcontainers:testcontainers:jar:1.10.6:test
[INFO]       +- org.apache.commons:commons-compress:jar:1.18:test
[INFO]       +- org.rnorth.duct-tape:duct-tape:jar:1.0.7:test
[INFO]       +- org.rnorth.visible-assertions:visible-assertions:jar:2.1.2:test
[INFO]       |  \- net.java.dev.jna:jna:jar:4.5.2:test
[INFO]       +- org.rnorth:tcp-unix-socket-proxy:jar:1.0.2:test
[INFO]       |  +- com.kohlschutter.junixsocket:junixsocket-native-common:jar:2.0.4:test
[INFO]       |  |  \- org.scijava:native-lib-loader:jar:2.0.2:test
[INFO]       |  \- com.kohlschutter.junixsocket:junixsocket-common:jar:2.0.4:test
[INFO]       \- net.java.dev.jna:jna-platform:jar:4.5.2:test
mostroverkhov commented 5 years ago

@lksvenoy-r7 reactor-netty:jar:0.8.2.RELEASE and netty deps are too old - some recent changes on rsocket-java netty transport need 0.8.8.RELEASE - correct versions can be pulled in with their bom io.projectreactor:reactor-bom:Californium-SR8. I'd also check for newer spring-boot bom pulling above versions

lksvenoy-r7 commented 5 years ago

@mostroverkhov Thank you so much, bumping reactor-netty did indeed resolve that problem. I'll test this version more and get back to you if it has solved the original issue or not.

lksvenoy-r7 commented 5 years ago

@mostroverkhov It seems to be much, much more resilient now. I actually managed to run out of ENIs in amazon, so haven't been able to do all the testing I've wanted so far. I'll report back on the issue tomorrow and let you know if it resolves it or not, it seems like it does. Thank you for your help.

lksvenoy-r7 commented 5 years ago

@mostroverkhov I've done some testing, and seems fine with the exception of memory usage. The service never seems to release memory, and ends up unresponsive over time. Would you like me to close this issue and make a new issue for this?

mostroverkhov commented 5 years ago

Open new one, with details how RSocket created/used: decoder type, streams rate limiting, number of concurrent streams, whether RSockets are closed etc.

mostroverkhov commented 5 years ago

@lksvenoy-r7 Ah, I missed you have posted part of RSocket related code - because zero-copy payload decoder is used, all incoming (except SETUP frame) Payloads has to be released, otherwise buffers are leaked. This relates to both requester RSocket and responder RSocket, for payloads received as response stream from requester RSocket, and single Payloads in handlers

lakrsv commented 5 years ago

@mostroverkhov Hello, different account.

I explicitly release the payload in a .map() operation as the first action in a finally block for turning the payload into my domain object. The only payloads I don't release (because I don't think it makes sense and I'm not sure how I would go about it) are the ones I create for sending to the client.

It goes something like

return Mono.just(payload)
        .map(this::getRequestFromPayload)
        .flux()
       .map(p -> {
try{
           return convert(p);
}
finally{
payload.release();
}
        })
        .onErrorResume(throwable ->
            Flux.just(createPayloadFromThrowable(throwable)));
lksvenoy-r7 commented 5 years ago

I've upgraded the instance I was testing on, and it seems remarkably stable. I am fetching gigabytes of data over the socket with no problems, with the exception of heavy load causing keep-alives to not go through, killing the connection occasionally. (This is probably my own fault) I am closing this ticket as the most recent snapshot fixes my problems. Am very much looking forward to it being released as I can't live without it!