rsocket / rsocket-java

Java implementation of RSocket
http://rsocket.io
Apache License 2.0
2.36k stars 354 forks source link

Send a message to an already established request-stream connection from the rsocket-server to all signed rscoket-clients except one #1096

Open TashaBond opened 1 year ago

TashaBond commented 1 year ago

I have two rsocket-client and one rsocket-server. Both rsocket-client initially subscribe to the broadcast route listenCommand to receive commands from rsocket-server. Then one of the clients sends a command on route executeCommand to send it to other rsocketRequsters besides itself via the previously established route listenCommand connection. That is, I need to send the message to all rsocketRequester excluding myself through the already established connection between client and server. Now I get the error "reactor.core.Execptions$ErrorCallbackNotImplemented: ApplicationErrorExecption (0x201): Requset-Stream not implemented" and the second client does not receive a command from the listenCommand subscription. I've seen that it is suggested to make a client-side handler, but is it possible to solve this problem on the server? Maybe I'm missing something important?

Expected Behavior

The client that sends the command will not receive it over the broadcast connection, but the other client will.

Actual Behavior

Now I get the error "reactor.core.Execptions$ErrorCallbackNotImplemented: ApplicationErrorExecption (0x201): Requset-Stream not implemented" and the second client does not receive a command from the listenCommand subscription.

Rsocket-server controller:

private Sinks.Many<String> executedCommandSink = Sinks.many().multicast()
    .directBestEffort();

@MessageMapping("project.{projectId}.command")
public Flux<String> listenCommand(@DestinationVariable String projectId, RSocketRequester requester) {

    return executedCommandSink.asFlux();
}

@MessageMapping("project.{projectId}.command.execute")
public Mono<?> executeCommand(
        @DestinationVariable String projectId, Mono<String> commandMono, RSocketRequester requester) {

    Set<RSocketRequester> requesters = getRequestersForSendCommand(projectId, requester);

    return commandMono.flatMap(command -> {
        sendGraphCommand(requesters, command);
        return Mono.just(command);
    });
}

private void sendCommand(Set<RSocketRequester> requesters, String command) {
    requesters.forEach(requesterToSend -> requesterToSend
        .route("project.{projectId}.command", command)
        .data(Flux.just(command))
        .retrieveFlux(String.class)
        .subscribe());
}

public Set<RSocketRequester> getRequestersForSendGraphCommand(String projectId, RSocketRequester requester) {
    Set<RSocketRequester> sessionsByProject = ConcurrentHashMap.newKeySet();
    allRequesters.forEachEntry(1L, entry -> {
        if (entry.getValue().equals(projectId) && entry.getKey() != requester) {
            sessionsByProject.add(entry.getKey());
        }
    });
    return sessionsByProject;
}

Rsocket-client:

@Autowired
    public RSocketManagerClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {

        clientUUID = UUID.randomUUID().toString();
        log.info("Connecting using client ID: {}", clientUUID);

        this.rsocketRequester = rsocketRequesterBuilder
                .setupData(clientUUID)
                .rsocketStrategies(strategies)
    .connectWebSocket(URI.create("ws://127.0.0.1:8307/rsocket"))
                .subscribeOn(Schedulers.parallel())
                .block();

        this.rsocketRequester.rsocket()
                .onClose()
                .doFirst(() -> log.info("Client: {} CONNECTED.", clientUUID))
                .doOnError(error -> log.error("Connection to client {} CLOSED", clientUUID))
                .doFinally(consumer -> log.info("Client {} DISCONNECTED", clientUUID))
                .subscribe();

public void executeCommand() {
    log.info("\nClient with id-{} subscribe on execute command", clientUUID);

    String block = this.rsocketRequester
            .route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command.execute")
            .data("Hello")
            .retrieveMono(String.class)
            .doOnNext(System.out::println)
            .block();
    log.info("Response: {}", block);
}

public void subscribeOnCommand() {
    log.info("\nClient with id-{} subscribe on command", clientUUID);

    this.rsocketRequester
            .route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command")
            .retrieveFlux(String.class)
            .doOnNext(System.out::println)
            .subscribe();
}