quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.84k stars 2.7k forks source link

Add async callback support for Quarkus WebSockets Next #41320

Open andreiyusupau opened 5 months ago

andreiyusupau commented 5 months ago

Description

When working with Quarkus WebSockets Next it seems impossible to execute async callbacks. Use cases are:

import io.quarkus.websockets.next.OnTextMessage; import io.quarkus.websockets.next.WebSocket; import io.quarkus.websockets.next.WebSocketConnection; import jakarta.inject.Inject;

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;

@WebSocket(path = "/test-next") public class WebsocketNext {

@Inject
WebSocketConnection connection;
private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

@OnTextMessage
public void onMessage(String message) {
    executorService.execute(() -> {
        try {
            // emulate slow operation
            TimeUnit.SECONDS.sleep(2L);
            connection.sendText(message);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
}

}

Running it leads to the following exception:
```java
Exception in thread "" jakarta.enterprise.context.ContextNotActiveException: SessionScoped context was not active when trying to obtain a bean instance for a client proxy of SYNTHETIC bean [class=io.quarkus.websockets.next.WebSocketConnection, id=2wpUB3H8VV_VmvxBreyF9vW6OLI]
    at io.quarkus.arc.impl.ClientProxies.notActive(ClientProxies.java:70)
    at io.quarkus.arc.impl.ClientProxies.getSingleContextDelegate(ClientProxies.java:30)
    at io.quarkus.websockets.next.WebSocketConnection_2wpUB3H8VV_VmvxBreyF9vW6OLI_Synthetic_ClientProxy.arc$delegate(Unknown Source)
    at io.quarkus.websockets.next.WebSocketConnection_2wpUB3H8VV_VmvxBreyF9vW6OLI_Synthetic_ClientProxy.sendText(Unknown Source)
    at com.andreiyusupau.WebsocketNext.lambda$onMessage$0(WebsocketNext.java:25)
    at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
    at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

Same example for Jakarta Websockets implemented by Quarkus works fine:

package com.andreiyusupau;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.websocket.OnMessage;
import jakarta.websocket.Session;
import jakarta.websocket.server.ServerEndpoint;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@ServerEndpoint("/test")
@ApplicationScoped
public class Websocket {

    private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

    @OnMessage
    public void onMessage(String message, Session session) {
        executorService.execute(() -> {
            try {
                // emulate slow operation
                TimeUnit.SECONDS.sleep(2L);
                session.getBasicRemote().sendText(message);
            } catch (InterruptedException | IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

}

Implementation ideas

No response

PedroBatista commented 5 months ago

Wouldn't parallelStream() solve this?

@Inject
 OpenConnections connections;

...

  connections.listAll().parallelStream().forEach(session -> {
                    session.sendText([big slow text send]);
            });

Sorry if that's not relevant but I'm was just passing by pulling my hair on why some websocket clients don't connect to Websockets Next but do with Jakarta Websockets ans just saw this issue.

andreiyusupau commented 5 months ago

@PedroBatista it doesn't seem to be solution for all cases. General problem is that WebSockets Next can't execute connection.sendText(message); in another thread, because connection is SessionScoped and such scope can't be processed in another thread. Direct injection of connection into method parameter (similar to Session in Jakarta WS @OnMessage method) would be great.

mkouba commented 5 months ago

@PedroBatista it doesn't seem to be solution for all cases. General problem is that WebSockets Next can't execute connection.sendText(message); in another thread, because connection is SessionScoped and such scope can't be processed in another thread. Direct injection of connection into method parameter (similar to Session in Jakarta WS @OnMessage method) would be great.

@andreiyusupau well, this is supported for a long time: https://quarkus.io/version/main/guides/websockets-next-reference#method-parameters

When working with Quarkus WebSockets Next it seems impossible to execute async callbacks.

@PedroBatista is right in the sense that the session context will not be active when a custom ExecutorService is used.

Also, if you make use of WebSocketConnection parameter or @Inject OpenConnections then you don't work with a client proxy (@SessionScoped) and connection.sendText(message) should just work:

@WebSocket(path = "/test-next")
public class WebsocketNext {

   private final ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

   @OnTextMessage
   public void onMessage(String message, WebSocketConnection connection) {
        executorService.execute(() -> {
            try {
                // emulate slow operation
                TimeUnit.SECONDS.sleep(2L);
                connection.sendText(message);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }
mkouba commented 5 months ago

@andreiyusupau By the way, virtual threads are supported by means of @RunOnVirtualThread, i.e. a method with blocking signature annotated with @RunOnVirtualThread spawns a new virtual thread. See also documentation.

andreiyusupau commented 5 months ago

@andreiyusupau well, this is supported for a long time: https://quarkus.io/version/main/guides/websockets-next-reference#method-parameters

@mkouba Great that it's already supported! But why all the examples (https://quarkus.io/version/main/guides/websockets-next-reference, https://quarkus.io/guides/websockets-next-tutorial) use @Inject WebSocketConnection connection instead of just passing connection as method parameter (seems far more straightforward to me)?

mkouba commented 5 months ago

But why all the examples (https://quarkus.io/version/main/guides/websockets-next-reference, https://quarkus.io/guides/websockets-next-tutorial) use @Inject WebSocketConnection connection instead of just passing connection as method parameter (seems far more straightforward to me)?

It's a matter of taste I guess ;-). Also @Inject WebSocketConnection was supported first so that's probably the reason :shrug:.