rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.36k stars 354 forks source link

Reconnect client automatically on server when server stop #1039

Closed leccyril closed 2 years ago

leccyril commented 2 years ago

I have a question and i read a lot of doc or forum but i dont find a concrete example.

I have one need

Client send telemetry datas periodically as an agent or deamon to server, (datas available from server saved in database...), and when server crash, client have to reconnect automatically on server.... (specific thing, server can send command to client to close it)

2 ways to do it,

1 create a periodical send fireAndForget data

2 more interesting (and what i try to do)

Client send setup connection to server, server can accept or refuse client, if accept ask to client for telemetry

with second option when server crash, client is not reconnecting when server restart. I tried to init connection another time when detect onError but work only one time

Expected Behavior

when server crash, client try to reconnect on server

Actual Behavior

client forget server and do nohintg more...

Steps to Reproduce

here the client code

@Slf4j
@Service
public class Client {

    @Value("${application.telemetry.test.uuid}")
    private String test;

    @Value("${application.telemetry.server.host}")
    private String host;

    @Value("${application.telemetry.server.port}")
    private Integer port;

    public static Integer INTERVAL;

    @Value("${application.telemetry.interval}")
    public void setInterval(Integer interval) {
        INTERVAL = interval;
    }

    private RSocketRequester rsocketRequester;
    private RSocketRequester.Builder rsocketRequesterBuilder;
    private RSocketStrategies strategies;

    @Autowired
    public Client(RSocketRequester.Builder builder, @Qualifier("rSocketStrategies") RSocketStrategies strategies) {
        this.rsocketRequesterBuilder = builder;
        this.strategies = strategies;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void initClient() {

        final String client;
        // (1)
        if ("uuid".equals(test)) {
            client = UUID.randomUUID().toString();
        } else {
            client = TelemetryUtils.getClientHostName();
        }
        log.info("Connecting using client ID:" + client);

        SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());

        this.rsocketRequester = rsocketRequesterBuilder.setupRoute("telemetry.identification").setupData(client)
                .rsocketStrategies(strategies).rsocketConnector(connector -> connector.acceptor(responder).reconnect(Retry.indefinitely()))
                .connectTcp(host, port).block();

        this.rsocketRequester.rsocket().onClose().doOnError(error -> {
            log.warn("Connection CLOSED");
        })
        .doFinally(consumer ->{
            log.info("Client DISCONNECTED");
            rsocketRequester.rsocket().dispose();
            initClient();
        }).subscribe();

        while (true);
    }

client handler

@Slf4j
@Component
public class ClientHandler {

    @MessageMapping("telemetry")
    public Flux<TelemetryDto> statusUpdate(String status) {
        log.info(status);
        log.info("Start send telemetry");        
        return Flux.interval(Duration.ofSeconds(Client.INTERVAL)).map(index ->new TelemetryDto(TelemetryUtils.getProcessCpuLoad(),TelemetryUtils.getMemoryUsage(),TelemetryUtils.getActiveProcesses()))
                .retry();

    }

    /**
     * Used to execute command sent by server (for now only stop agent)
     * @param command
     */
    @MessageMapping("close")
    public void message(String command){
        log.info("new command coming : {}", command);
        if("stop".equals(command)) {
            log.info("stopping agent requiested");
            Runtime.getRuntime().exit(0);
        }else {
            log.error("Command not implemented");
        }
    }
}

And server code (PS i am using spring boot)


@Slf4j
@CrossOrigin("*")
@Controller
public class TelemetryController {

    @Autowired
    TelemetryRepository repository;

    private final Map<String, RSocketRequester> CLIENTS = new HashMap<>();

    @GetMapping("/clients/connected")
    public ResponseEntity<?> getClientsConnected() {
        log.debug("Get connected client list");
        return ResponseEntity.ok(CLIENTS.keySet());
    }

    @GetMapping("/clients")
    public ResponseEntity<?> getAllClients() {
        log.debug("get all data registered");
        return ResponseEntity.ok(repository.findAll());
    }

    @GetMapping("/clients/{id}")
    public ResponseEntity<?> getClientData(@PathVariable(value = "id") String id) {
        log.debug("get all data registered");
        return ResponseEntity.ok(repository.findById(id));
    }

    /**
     * used to close manually agent
     * 
     * @param uuid
     * @return
     */
    @PostMapping("/clients/close/{id}")
    public ResponseEntity<?> closeClient(@PathVariable(value = "id") String id) {
        log.info("closing agent with id {}", id);
        // send command to client requested
        if (CLIENTS.get(id) != null) {
            Mono<Void> call = CLIENTS.get(id).route("close").data("stop").send();
            call.doOnSuccess(consumer -> {
                log.info("Client {} closed",id);
                CLIENTS.remove(id);
            }).subscribe();
            return ResponseEntity.noContent().build();
        }

        return ResponseEntity.notFound().build();

    }

    @PreDestroy
    void shutdown() {
        log.info("Detaching all remaining clients...");
        CLIENTS.values().stream().forEach(requester -> requester.rsocket().dispose());
        log.info("Shutting down.");
    }

    @MessageMapping("foo")
    void call(RSocketRequester requester, @Payload String data) {
        log.info(data);
    }

    @ConnectMapping("telemetry.identification")
    void connectShellClientAndAskForTelemetry(RSocketRequester requester, @Payload String client) {

        requester.rsocket().onClose().doFirst(() -> {
            // Add all new clients to a client list
            log.info("Client: {} CONNECTED.", client);
            CLIENTS.put(client, requester);
        }).doOnError(error -> {
            // Warn when channels are closed by clients
            log.warn("Channel to client {} CLOSED", client);
        }).doFinally(consumer -> {
            // Remove disconnected clients from the client list
            CLIENTS.remove(client);
            log.info("Client {} DISCONNECTED", client);
        }).subscribe();

        // Once connection confirmed, ask to send telemetry update
        requester.route("telemetry").data("OPEN").retrieveFlux(TelemetryDto.class).doOnNext(s -> {
            log.info("Client: {} inserting data", client, s.getCpuUsage());
            // each time data incoming, update database
            repository.save(
                    new Telemetry(client, s.getCpuUsage(), s.getMemoryUsed(), s.getProcesses(), LocalDateTime.now()));
        }).retry().subscribe();

        // manage channel close and disconnection
        Hooks.onErrorDropped(error -> {
            if ("Disposed".equals(error.getCause().getMessage())
                    || error.getMessage().contains("ClosedChannelException"))
                log.trace("client was stopped {}", client);
            else
                log.error("error while getting telemetry datas", error);
        });
    }

}```

## Possible Solution

Concrete example to achieve this case (i think  common case)
## Your Environment

Spring boot rsocket 2.6.3
Spring boot
Java 11 or 15
Windows 10
OlegDokuka commented 2 years ago

hey @leccyril.

looks like your issue is a duplicate of #987