jakartaee / websocket

Jakarta WebSocket
https://projects.eclipse.org/projects/ee4j.websocket
Other
60 stars 40 forks source link

Funcational Programming and Flow/Reactive Streams Support #445

Open hantsy opened 1 month ago

hantsy commented 1 month ago

Add new functional programming API using JDK 9 Flow/ReactiveStreams to replace the current annotation-based declarations.

Refer to the Reactive WebSocket implementation in Spring Webflux.

The server side:

public class EchoHandler implements WebSocketHandler {

    public EchoHandler() {
    }

    @Override
    public Mono<Void> handle(WebSocketSession session) {

        return session.send(session.receive()
                .doOnNext(WebSocketMessage::retain)// Use retain() for Reactor Netty
                .map(m -> session.textMessage("received:" + m.getPayloadAsText()))
        );
    }
}

And the client side:

var socketUri = URI.create("ws://localhost:" + port + "/echo");

WebSocketHandler handler = session -> {

    var receiveMono = session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .log("client receiving::")
            .doOnNext(replayList::add)
            .then();

    var sendMono = session
            .send(
                    Mono.delay(Duration.ofMillis(100)).thenMany(
                            Flux.just("message one", "message two").map(session::textMessage)
                    )
            )
            .doOnSubscribe(subscription -> log.debug("session is open"))
            .doOnTerminate(() -> log.debug("session is closing"))
            .log("client sending::");

    return sendMono.then(receiveMono);
};

this.client.execute(socketUri, handler)
        .doOnTerminate(latch::countDown)
        .subscribe();
joakime commented 1 month ago

What you are proposing can be built on top of the Jakarta WebSocket API. Some projects have already done that too.