rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.36k stars 354 forks source link

Detect closed server connection and automatically reconnect. #540

Closed joshfix closed 5 years ago

joshfix commented 6 years ago

I have a simple setup between two services that I've built from provided examples. I have one service that listens for incoming connections:

        TcpServerTransport tcp = TcpServerTransport.create(configProps.getPort());
        RSocketFactory.receive()
                .acceptor(itemSocketAcceptor)
                .transport(tcp)
                .start().log()
                .subscribe();

Another service attempts to make a connection:

        this.socket = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(<host>, <port>)
                .start()
                .block();

If I redeploy the receiving service, the client service never reports an error (unless I set a timeout on the requestResponse method). I can't seem to find a way for the client side to detect that the underlying connection has closed and to automatically try to reconnect. To boot, these are web applications in Spring Boot, so I'm never able to attempt to call block() during a web request.

The onClose method seems to be an indicator that the connection has closed, but I'm not sure how to use that in the context of a web request to be able to attempt to rebuild the connection which necessarily requires me to call block.

Not sure if this is an actual issue or if I've simply overlooked some documentation or examples. I'm anxious to adopt rsocket in my company, so any direction would be greatly appreciated.

yschimke commented 6 years ago

@robertroeser does the load balanced socket do this for you even if you only had a single backend?

https://github.com/rsocket/rsocket-java/blob/1.0.x/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java

robertroeser commented 6 years ago

@joshfix @yschimke yes you can use the LoadBalancedRSocketMono for. You just have your RSocketSupplier create a new connection when it's called. If the connection breaks, it will call the supplier for a new RSocket and create the connection again.

joshfix commented 6 years ago

Thanks for the quick responses... I'm playing with the load balancer but still can't seem to get it to work. Here is my client code:

    @PostConstruct
    public void connect() {
        log.info("Attempting RSocket connection to " + configProps.getHost() + ":" + configProps.getPort());
        socket = LoadBalancedRSocketMono
                .create(Flux.just(Collections.singleton(new RSocketSupplier(() -> RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create(configProps.getHost(), configProps.getPort()))
                        .start()
                        .doOnSubscribe(s -> log.info("RSocket connection established.", s))))))
                .block();
    }

When I kill the server/acceptor, I see the following log messages:

2018-10-26 13:56:51.063 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : Removing socket: -> WeightedSocket(median=0.0 quantile-low=0.0 quantile-high=0.0 inter-arrival=1000000.0 duration/pending=0.0 pending=0 availability= 0.0)->io.rsocket.client.filter.RSocketSupplier$AvailabilityAwareRSocketProxy@2b0aead6
2018-10-26 13:56:51.063 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : aperture 0 is below target 3, adding 3 sockets
2018-10-26 13:56:51.063 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : addSockets(3) restricted by the number of factories, i.e. addSockets(1)
2018-10-26 13:56:51.063  INFO 1 --- [-client-epoll-8] c.b.n.b.index.rsocket.StacRSocketClient  : RSocket connection established.
2018-10-26 13:56:51.066 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : Bumping refresh period, 15000->22500
2018-10-26 13:56:51.067 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : addSockets(1) restricted by the number of factories, i.e. addSockets(0)
2018-10-26 13:56:51.075  WARN 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : Exception while subscribing to the RSocket source

io.netty.channel.AbstractChannel$AnnotatedConnectException: syscall:getsockopt(..) failed: Connection refused: host.docker.internal/192.168.65.2:7000
    at io.netty.channel.unix.Socket.finishConnect(..)(Unknown Source) ~[netty-transport-native-unix-common-4.1.29.Final.jar!/:4.1.29.Final]
Caused by: io.netty.channel.unix.Errors$NativeConnectException: syscall:getsockopt(..) failed: Connection refused
    ... 1 common frames omitted

Then I restart the server and attempt to make another request to the client service, but the rsocket request just times out:

java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 10000ms in 'log' (and no fallback has been configured)

Here is my code that makes the request:

    public Mono<Item> getItem(String id) {
        log.debug("Requesting item from STAC with id " + id);
        return socket
                .requestResponse(DefaultPayload.create(id)).log()
                .timeout(Duration.ofSeconds(10))
                .map(this::deserializeItem);
    }

Apologies if I'm missing something obvious.

robertroeser commented 6 years ago

@joshfix Don't block on the loadbalancer - you need to subscribe to it each time so it can load balance and what not.

Should be more like this:

log.info("Attempting RSocket connection to " + configProps.getHost() + ":" + configProps.getPort());
        Mono<RSocket> mono = LoadBalancedRSocketMono
                .create(Flux.just(Collections.singleton(new RSocketSupplier(() -> RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create(configProps.getHost(), configProps.getPort()))
                        .start()
                        .doOnSubscribe(s -> log.info("RSocket connection established.", s))))))
    }

 public Mono<Item> getItem(String id) {
        log.debug("Requesting item from STAC with id " + id);
        return mono.flatMap(socket -> 
              socket
                .requestResponse(DefaultPayload.create(id)).log()
                .timeout(Duration.ofSeconds(10))
                .map(this::deserializeItem)
       );
    }
yschimke commented 6 years ago

@robertroeser this could be a good candidate for stackoverflow

joshfix commented 6 years ago

Thanks a lot @robertoeser! I feel like we're getting closer :) I implemented the non-blocking code as you demonstrated, but I'm still not getting the desired behavior.

Here is what I see immediately after killing the server service:

2018-10-27 09:59:55.500 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : Removing socket: -> WeightedSocket(median=0.0 quantile-low=0.0 quantile-high=0.0 inter-arrival=2.2901271243621685E7 duration/pending=0.0 pending=0 availability= 0.0)->io.rsocket.client.filter.RSocketSupplier$AvailabilityAwareRSocketProxy@5b518755
2018-10-27 09:59:55.500 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : aperture 0 is below target 1, adding 1 sockets
2018-10-27 09:59:55.501  INFO 1 --- [-client-epoll-8] c.b.n.b.index.rsocket.StacRSocketClient  : RSocket connection established.
2018-10-27 09:59:55.505 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : Bumping refresh period, 33750->50625
2018-10-27 09:59:55.505 DEBUG 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : addSockets(1) restricted by the number of factories, i.e. addSockets(0)
2018-10-27 09:59:55.515  WARN 1 --- [-client-epoll-8] i.r.client.LoadBalancedRSocketMono       : Exception while subscribing to the RSocket source

io.netty.channel.AbstractChannel$AnnotatedConnectException: syscall:getsockopt(..) failed: Connection refused: host.docker.internal/192.168.65.2:7000
    at io.netty.channel.unix.Socket.finishConnect(..)(Unknown Source) ~[netty-transport-native-unix-common-4.1.29.Final.jar!/:4.1.29.Final]
Caused by: io.netty.channel.unix.Errors$NativeConnectException: syscall:getsockopt(..) failed: Connection refused
    ... 1 common frames omitted

Once I bring the server service back up and attempt to make another request, I get this:

2018-10-27 10:02:02.498  INFO 1 --- [-server-epoll-7] c.b.ng.bcis.controller.WmsController     : Incoming GETMAP request
2018-10-27 10:02:02.500 DEBUG 1 --- [-server-epoll-7] c.b.n.b.index.rsocket.StacRSocketClient  : Requesting item from STAC with id LC08_L1TP_036030_20170622_20170630_01_T1
2018-10-27 10:02:02.508  INFO 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : onSubscribe([Fuseable] Operators.EmptySubscription)
2018-10-27 10:02:02.508  INFO 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : request(unbounded)
2018-10-27 10:02:02.518 ERROR 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : onError(io.rsocket.client.NoAvailableRSocketException)
2018-10-27 10:02:02.519 ERROR 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : 

io.rsocket.client.NoAvailableRSocketException: null
    at io.rsocket.client.NoAvailableRSocketException.<init>(Unknown Source) ~[rsocket-load-balancer-0.11.8.jar!/:na]

2018-10-27 10:02:02.520 ERROR 1 --- [-server-epoll-7] c.b.n.b.index.rsocket.StacRSocketClient  : RSocket client error.

io.rsocket.client.NoAvailableRSocketException: null
    at io.rsocket.client.NoAvailableRSocketException.<init>(Unknown Source) ~[rsocket-load-balancer-0.11.8.jar!/:na]

2018-10-27 10:02:02.521  INFO 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : cancel()
2018-10-27 10:02:02.526 ERROR 1 --- [-server-epoll-7] c.b.n.b.c.ExceptionControllerAdvice      : An unexpected exception has occurred.

io.rsocket.client.NoAvailableRSocketException: null
    at io.rsocket.client.NoAvailableRSocketException.<init>(Unknown Source) ~[rsocket-load-balancer-0.11.8.jar!/:na]

Would you like me to post this on SO and move the conversation there?

joshfix commented 6 years ago

FWIW I also added a logging statement to the requestResponse method. The RSocket object name is LoadBalancedRSocketMono$FailingRSocket@9185 and reports availability is 0.0 and disposed is true. I also tried adding a second RSocketSupplier to the load balancer, but it only results in double error messages.

robertroeser commented 6 years ago

There maybe another bug that is causing an issue that we are looking into. In the meantime try adding .repeat() to this Flux so that will keep emitting when called - like so

Flux.just(Collections.singleton(new RSocketSupplier(() -> RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create(configProps.getHost(), configProps.getPort()))
                        .start()
                        .doOnSubscribe(s -> log.info("RSocket connection established.", s)))))
                        .repeat()
joshfix commented 6 years ago

Ok, I implemented the code:

        socketMono = LoadBalancedRSocketMono.create(Flux.just(Collections.singleton(new RSocketSupplier(() -> RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(configProps.getHost(), configProps.getPort()))
                .start()
                .doOnSubscribe(s -> log.info("RSocket connection established.", s)))))
                .repeat());

but it results in the same errors as before.

robertroeser commented 6 years ago

@joshfix @mostroverkhov is looking into this issue, and thinks it's related to https://github.com/rsocket/rsocket-java/issues/468

mostroverkhov commented 6 years ago

@joshfix Just few clarifications to check if our intended change may fix your issue:

Is this correct?

joshfix commented 6 years ago

@mostroverkhov I believe that is correct. Before using the load balancer, there never seemed to be an error on the client side if the server was restarted; it would just hang (seemingly indefinitely). If I set a timeout on the request, I would see the timeout exception, but I could not find any good way to tell the client to attempt to reconnect after the server side had restarted.

I was using roscket-core and rsocket-transport-netty 0.11.7 and just noticed that I was using rsocket-load-balancer 0.11.8. 0.11.9 does not appear to be on maven central. I'll update everything to 0.11.8 and restest/reverify the issue.

mostroverkhov commented 6 years ago

@joshfix Yes, indeed 0.11.8 is latest version - just wanted to make sure latest version is used. Regarding issue with streams - there is pending PR #541, so hopefully we get this fixed soon.

joshfix commented 6 years ago

Thanks @mostroverkhov. So with that fix, will the load balancer no longer be required to reconnect? Any chance there could be some sort of baked-in reconnect method/operation in the client, so when it detects the terminate signal it follows some sort of strategy to automatically reconnect over time?

mostroverkhov commented 6 years ago

@joshfix I believe you still need to use load balancer. Automatic reconnect is on radar but unlikely to happen in near future.

robertroeser commented 6 years ago

@joshfix Another question - do you want an RSocket that automatically reconnects, or do you want one that reconnects when it gets a new request. The load balancer is supposed to do the later - reconnect when there is a new request. If it's still not doing that then there is a bug. As far as a socket that reconnects automatically - here is an an example of that https://github.com/netifi-proteus/proteus-java/blob/develop/proteus-client/src/main/java/io/netifi/proteus/rsocket/WeightedReconnectingRSocket.java#L226

robertroeser commented 6 years ago

I spent some more time looking at the load balancer. The way it is written right now it probably won't work very well if you just have a single server. If you have several instances in your backend it will and restart one it will connect to one of those and load balancing between them, but it looks like there is an issue in the one it's currently write to deal with the 1 server and 1 client.

joshfix commented 6 years ago

I noticed that after I start up my client service, the very first request takes quite a bit longer to complete than any subsequent request.... not only would I like for the connection to automatically reconnect, I would like some way to initialize that connection so that the first request isn't delayed.

I haven't quite figured out how to handle load balancing yet. I'm running my services in kubernetes and using the internal kubernetes dns names for my services has the hostnames for rsocket. Using these, kubernetes will round-robin requests against all available pods behind that name I suppose I could use some kubernetes library to determine how many pods are available and create an rsocket load balancer that creates the same number of clients. Would have to get fancier in the event of scaling, but not sure if I'm worried about those cases quite yet.

That being said, when we push updates to a service, it will restart the pods for that service. So pushing updates to the server-side service kills all client-side services. They then all need to be manually restarted to re-connect to the server.

I definitely did not have any luck with the rsocket load balancer reconnecting on request after restarting the server service. After simply restarting the server-side service, the client would produce these errors:

2018-10-27 10:02:02.498  INFO 1 --- [-server-epoll-7] c.b.ng.bcis.controller.WmsController     : Incoming GETMAP request
2018-10-27 10:02:02.500 DEBUG 1 --- [-server-epoll-7] c.b.n.b.index.rsocket.StacRSocketClient  : Requesting item from STAC with id LC08_L1TP_036030_20170622_20170630_01_T1
2018-10-27 10:02:02.508  INFO 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : onSubscribe([Fuseable] Operators.EmptySubscription)
2018-10-27 10:02:02.508  INFO 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : request(unbounded)
2018-10-27 10:02:02.518 ERROR 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : onError(io.rsocket.client.NoAvailableRSocketException)
2018-10-27 10:02:02.519 ERROR 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : 

io.rsocket.client.NoAvailableRSocketException: null
    at io.rsocket.client.NoAvailableRSocketException.<init>(Unknown Source) ~[rsocket-load-balancer-0.11.8.jar!/:na]

2018-10-27 10:02:02.520 ERROR 1 --- [-server-epoll-7] c.b.n.b.index.rsocket.StacRSocketClient  : RSocket client error.

io.rsocket.client.NoAvailableRSocketException: null
    at io.rsocket.client.NoAvailableRSocketException.<init>(Unknown Source) ~[rsocket-load-balancer-0.11.8.jar!/:na]

2018-10-27 10:02:02.521  INFO 1 --- [-server-epoll-7] reactor.Mono.Error.3                     : cancel()
2018-10-27 10:02:02.526 ERROR 1 --- [-server-epoll-7] c.b.n.b.c.ExceptionControllerAdvice      : An unexpected exception has occurred.

io.rsocket.client.NoAvailableRSocketException: null
    at io.rsocket.client.NoAvailableRSocketException.<init>(Unknown Source) ~[rsocket-load-balancer-0.11.8.jar!/:na]

At that point, the LoadBalancedRSocketMono was selecting the FailingRSocket implementation and were reporting availability was 0.0 and that they were disposed. There was no recovery at that point outside of restarting the client service.

robertroeser commented 6 years ago

@joshfix did some digging - I think there might be some problems that got fixed in some internal code I have - I'm in the process of porting this back into opensource load balancer. I hope to have some for you to test in a day or two.

joshfix commented 6 years ago

Excellent -- thank you!

robertroeser commented 5 years ago

@joshfix released 0.11.3 which should fix your issue

joshfix commented 5 years ago

Thanks for the update @robertroeser. I've been out on vacation but I'm giving 0.11.13 a shot now. I think it's definitely a step in the right direction, but it looks like it's hard coded to only wait 500ms between retry attempts and fail hard after 5 total reconnection attempts:

https://github.com/rsocket/rsocket-java/blob/0.11.13/rsocket-load-balancer/src/main/java/io/rsocket/client/LoadBalancedRSocketMono.java#L538

This only gives any downstream service 2.5 seconds to restart or recover before my entire suite of services is rendered worthless and I have to restart every service in my chain from the point of failure up, which was the initial issue when it never retried.

In my situation, I think I would like infinite retries, but definitely would be nice to have both the number of retries and time between retries configurable. Is

joshfix commented 5 years ago

@robertroeser just curious if you had any follow up comments on this

robertroeser commented 5 years ago

@joshfix sorry haven't had a chance to look at this - can you create an issue to make the retries configurable so I don't forget about it?

yehohanan7 commented 5 years ago

@joshfix I have the same problem, did you manage to get it working? could you please post here the snippet of your final code

joshfix commented 5 years ago

@yehohanan7 My brain tends to purge after about 2 weeks of not working on something :) I do believe things were working as expected using LoadBalancedRSocketMono as of v0.11.15. I responded privately with some code examples.

alek-sys commented 5 years ago

@joshfix thanks for digging into this. Would you be able to share a working example please? I'm playing with different options to use LoadBalancedRSocketMono, but still can't get to re-connect automatically.