rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.36k stars 354 forks source link

Load Balancer not refreshing newly added nodes. #716

Closed pckeyan closed 4 years ago

pckeyan commented 4 years ago

I am working on a use case to bring up server nodes that gets added to LoadBalancedRSocketMono. Server nodes when they come up or goes down will register itself to a collection which is streamed to LoadBalancedRSocketMono. But LoadBalancedRSocketMono works only on the first registered server node when the create method was called. Below is the sample snippet. I was raising this question in community.netifi.com till last week and site is down for now. Can you please help me understand what I am doing wrong or is this a bug?

package com.rsocket.server;

import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.client.LoadBalancedRSocketMono;
import io.rsocket.client.filter.RSocketSupplier;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.BaseStream;
import java.util.stream.Collectors;

public class DynamicLBClient {

    public static void main(String[] args) {
        DynamicLBClient dynamicLBClient = new DynamicLBClient();
        dynamicLBClient.callLB();

    }

    private void callLB() {

        int[] server1 = new int[] {9000};
        int[] server2 = new int[] {9001};

        Set<Set<RSocketSupplier>> suppliers = new HashSet<>();

        suppliers.add(Arrays.stream(server1)
                .mapToObj(port -> new RSocketSupplier(() -> Mono.just(RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create("localhost", port))
                        .start().doOnSubscribe(s -> System.out.println("RSocket connection established." + s))
                        .block())))
                .collect(
                        Collectors.toSet()));

        LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono
                .create(Flux.fromStream(suppliers.stream()));
        Mono<Payload> monoPayload = balancer.block().requestResponse(DefaultPayload.create("test-request"));
        Payload server1Payload = monoPayload.block();
        System.out.println("Response Received Server 1---->" + server1Payload.getDataUtf8());

        suppliers.add(Arrays.stream(server2)
                .mapToObj(port -> new RSocketSupplier(() -> Mono.just(RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create("localhost", port))
                        .start().doOnSubscribe(s -> System.out.println("RSocket connection established." + s))
                        .block())))
                .collect(
                        Collectors.toSet()));

        Mono<Payload> server2Mono = balancer.log().block().requestResponse(DefaultPayload.create("test-request-second-server"));
        Payload server2Payload = server2Mono.block();
        System.out.println("Response Received- Server 2 --->" + server2Payload.getDataUtf8());

    }
}

Below lines have to be called again in order for the new server node to service the calls:

       balancer = LoadBalancedRSocketMono
                .create(Flux.fromStream(suppliers.stream()));
pckeyan commented 4 years ago

@OlegDokuka Please add this feature of adding new endpoint at Runtime to the LoadBalancer Feature.

OlegDokuka commented 4 years ago

I'm not quite understand what is the exacted behavior. @pckeyan csn you please explain it?

pckeyan commented 4 years ago

Sure @OlegDokuka. Once I start a LoadBalancer with Suppliers, based on load if I bring up a new node to participate with this LoadBalancer; there is no way to add to the LoadBalancer. Even if I add to the underlying Flux it is not refreshing. So I have to always create a new LoadBalancedRSocketMono inorder to use the new node to load balance.

OlegDokuka commented 4 years ago

Yeah. This is a bug. So it is not a future but rather a bug so it should be fixed

OlegDokuka commented 4 years ago

Hey @pckeyan!

We have a reworked loadbalancer and now you can choose between strategies used for loadbalancing. Now, if you wanna have a fair balancing - you may choose a simple RoundRobing Loadbalancer. In turn, there is the same Weighted Loadbalancer. However, please remember, that the Weighted strategy is unfair you may never get a call onto newly connected servers if the first chosen is in a good state.

To try the new Loadbalance API please see the following example

For more information about Weighted Loadbalancing, please see the following talk

pckeyan commented 4 years ago

@OlegDokuka I am testing this example, I observe that load is not going to Server 1 and I lost a message in the sequence. Can you please comment?

Server 3 got fnf test0 Server 3 got fnf test2 Server 2 got fnf test3 Server 3 got fnf test4 Server 2 got fnf test5 Server 3 got fnf test6 Server 2 got fnf test7 Server 3 got fnf test8 Server 2 got fnf test9

OlegDokuka commented 4 years ago

@pckeyan is it the same test? Do you use the new loadbalance API? What strategy do you use? If the one you use is a weighted strategy - then it is fine, the weighted loadbalance strategy does not guarantee fairness it may use 2 instances from the 100 if the latency fits into the lowest (25%) highest (75%) quantiles. If you want to have requests landed fairly - there is a RoundRobinLoadbalanceStrategy for that purpose.

Also, be aware that if you got a request getting into the inactive RSocket (e.g. you got connection lost but because of racing nature of events it may happen that these sockets may not be removed on time) your call may end up with an error, thus be prepared to use .retry logic in order to retry

pckeyan commented 4 years ago

@OlegDokuka Thank You. I used the same example mentioned above only by changing the count to 10 instead of 10000 in the for loop to verify the distribution and loss. First message is lost always lost and the load is not sent to the first server.

pckeyan commented 4 years ago

@OlegDokuka And Can you please comment on how I can pass the List of LoadBalanceTarget as reference so that when I add new node or remove a node, it automatically gets updated within producer? With reference to same example.

pckeyan commented 4 years ago

@OlegDokuka retry fixed the message but it was processed by another server. First server in the list is unreachable it seems, I will test with my sample and comment here.

OlegDokuka commented 4 years ago

@pckeyan

@OlegDokuka Thank You. I used the same example mentioned above only by changing the count to 10 instead of 10000 in the for loop to verify the distribution and loss. First message is lost always lost and the load is not sent to the first server.

Please make sure you use the latest rsocket version ( we have just released the 1.1.0 today, which has a few more improvements)

@OlegDokuka retry fixed the message but it was processed by another server. First server in the list is unreachable it seems, I will test with my sample and comment here.

Please be sure that the builder uses .roundRobinLoadbalanceStrategy(), by accident the version you might check out, may use the weighted strategy.

    RSocketClient rSocketClient =
        LoadbalanceRSocketClient.builder(producer).roundRobinLoadbalanceStrategy().build();

@OlegDokuka And Can you please comment on how I can pass the List of LoadBalanceTarget as reference so that when I add new node or remove a node, it automatically gets updated within producer? With reference to same example

LoadbalanceTarget has 2 properties: the transport - used to establish a connection to a node; the key - a unique identifier for a specific endpoint. Every update sent by Publisher<List<LoadbalanceTarget>> represent the current snapshot of available nodes. For example:

if you send onNext([LoadbalanceTarget("1", tcpTransport), LoadbalanceTarget("2", tcpTransport)]) then the pool will have only 2 target to loadbalance between. if the next event is onNext([LoadbalanceTarget("3", tcpTransport)]) it means that node with key "1" and "2" is not available anymore and the pool will update state according to that information and will keep only a single target to node "3" in the pool

OlegDokuka commented 4 years ago

@pckeyan feel free to open a new issue if you spot any other problems related to the new implementation