rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.35k stars 354 forks source link

LoadbalanceRSocketClient lacks reconnection and retry functions #1095

Closed LiuYawei2019 closed 6 months ago

LiuYawei2019 commented 1 year ago

The version of Spring Boot was upgraded from 2.1.2.RELEASE to 2.4.6, and the RSocket version was upgraded from 0.11.16 to 1.1.0. After that, LoadbalanceRSocketClient was used to connect the client to the server. When the server is down for maintenance, when the node service stops, the client load no longer sends requests to the node, which is normal. But after the service starts, the client load no longer sends requests to the service, which is very frustrating.

Expected Behavior

Expect the client to reconnect to the server and join the load balancing strategy after the server restarts.If LoadBalanceRSocketClient can achieve reconnection and retry like LoadBalancedRSocketMono, it would be perfect

Actual Behavior

The actual situation is that the service cannot be reconnected on the client after restarting, and the service exists independently and does not accept any requests unless the client is reloaded.

Steps to Reproduce

Test method, implement a thread in a simple class to listen to port 8081, and the code is as follows:

@Test
public void reproCase() {
    public static void main(String[] args) {
        RSocketServer.create(
                            SocketAcceptor.forRequestResponse(
                                    p -> {
                                        System.out.println("Server 1 got fnf " + p.getDataUtf8());
                                        return Mono.just(DefaultPayload.create("Server 1 response"))
                                                .delayElement(Duration.ofMillis(100));
                                    }))
                    .bindNow(TcpServerTransport.create(8081));

    }
}

CloseableChannel server2;
    CloseableChannel server3;

    @Before
    public void befor() {

        server2 = RSocketServer.create(
                                SocketAcceptor.forRequestResponse(
                                        p -> {
                                            System.out.println("Server 2 got fnf " + p.getDataUtf8());
                                            return Mono.just(DefaultPayload.create("Server 2 response"))
                                                    .delayElement(Duration.ofMillis(100));
                                        }))
                        .bindNow(TcpServerTransport.create(8082));

        server3 = RSocketServer.create(
                                SocketAcceptor.forRequestResponse(
                                        p -> {
                                            System.out.println("Server 3 got fnf " + p.getDataUtf8());
                                            return Mono.just(DefaultPayload.create("Server 3 response"))
                                                    .delayElement(Duration.ofMillis(100));
                                        }))
                        .bindNow(TcpServerTransport.create(8083));
    }

    @Test
    public void testLoadBalance() {

        LoadbalanceTarget target8081 = LoadbalanceTarget.from("8081", TcpClientTransport.create(8081));
        LoadbalanceTarget target8082 = LoadbalanceTarget.from("8082", TcpClientTransport.create(8082));
        LoadbalanceTarget target8083 = LoadbalanceTarget.from("8083", TcpClientTransport.create(8083));
        Flux<List<LoadbalanceTarget>> producer = Flux.just(Arrays.asList(target8081, target8083, target8082));
        RSocketClient rSocketClient = LoadbalanceRSocketClient.builder(producer).roundRobinLoadbalanceStrategy().build();

        for (int i = 0; i < 150; i++) {
            try {
                String result = rSocketClient.requestResponse(Mono.just(DefaultPayload.create("test" + i))).map(p -> p.getDataUtf8()).block();
                System.out.println(result);
                TimeUnit.SECONDS.sleep(1);
            } catch (Throwable t) {
                // no ops
            }
        }

    }

Possible Solution

The client did not reconnect to 8081 to send requests after restarting the 8081 service port

Your Environment

Spring boot 2.4.6 JDK 8 RSocket version(s) used: 1.1.0

LiuYawei2019 commented 6 months ago

No one cares, it's really helpless