quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.58k stars 2.63k forks source link

Add support for @ObservesAsync runs with @RunOnVirtualThread #36218

Closed Willian199 closed 9 months ago

Willian199 commented 11 months ago

Description

Make it possible to use @RunOnVirtualThread in a method that uses @ObservesAsync Ex:

    @RunOnVirtualThread
    void onEvent(@ObservesAsync MyTask task) {
        Log.infov("Thread: {0}", Thread.currentThread().getName());

    }

Implementation ideas

No response

quarkus-bot[bot] commented 11 months ago

/cc @Ladicek (arc), @manovotn (arc), @mkouba (arc)

geoand commented 11 months ago

My I ask what use case you have in mind for this?

Willian199 commented 11 months ago

I have 2 cases in mind. First: Certain features may notify users of their status. This will make a select on database and trigger an email or push notification. Second: When the server throws an unexpected exception, this will make a HTTP request sending the stacktrace to another server for further analysis.

geoand commented 11 months ago

Understood, thanks

Willian199 commented 11 months ago

Something that may can be look together and just documented. It's about the behavior from a HTTP rest service annotated with @RunOnVirtualThread, which fires sync event and waits for it to complete. This will cause the Virtual Thread to be pinned, right?

Ladicek commented 11 months ago

Asynchronous events are notified on worker threads, so notifying on [new] virtual threads shouldn't be that hard, but also I don't really see why that's necessary. You can already block in async observers safely, if that's what you're after.

It's about the behavior from a HTTP rest service annotated with @RunOnVirtualThread, which fires sync event and waits for it to complete. This will cause the Virtual Thread to be pinned, right?

No. Synchronous events are delivered sequentially in the original thread, so there's no waiting, it's just a loop that calls all the methods.

mkouba commented 11 months ago

So one of the problems I can see is that NotificationOptions can be used to configure the observer notification (see Event#fireAsync(U, NotificationOptions)), i.e. the NotificationOptions.getExecutor() should be used to execute asynchronous events.

Also note that currently all asynchronous observers are notified serially in a single worker thread.

Ladicek commented 11 months ago

Ah that's interesting, I thought we submitted them all concurrently. Also good point about NotificationOptions.getExecutor(). That seems to clarify the intended design: it is the producer of the event that controls how observers are notified. If the producer wants to notify the observers on a virtual thread, they should be able to use

event.fireAsync(payload, NotificationOptions.ofExecutor(Executors.newVirtualThreadPerTaskExecutor()));
mkouba commented 11 months ago

If the producer wants to notify the observers on a virtual thread, they should be able to use...

Except that in this case, the Vert.x duplicated context would not used. I'm not sure if that's a big problem though. Also Executors.newVirtualThreadPerTaskExecutor() does not make use of the Quarkus-customized executor.

The pattern we use in other extensions looks like:

Context context = VertxContext.getOrCreateDuplicatedContext(vertx);
VertxContextSafetyToggle.setContextSafe(context, true);
context.runOnContext(new Handler<Void>() {
   @Override
   public void handle(Void event) {
      VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
          @Override
          public void run() {
             // do something...
          }
      });
   }
});

CC @cescoffier

cescoffier commented 11 months ago

You MUST use the Quarkus virtual executor, or you will have lots of trouble. Typically, it smoothly falls back to worker threads, it handles the duplicated context (and we have more and more parts of Quarkus relying on this), and it assigns a name to the virtual thread (because unnamed threads are great to debug)...

cescoffier commented 11 months ago

So, yes you will need to use the pattern you found.

mkouba commented 11 months ago

So, yes you will need to use the pattern you found.

:+1: Thanks Clement.

Ladicek commented 11 months ago

Do we have an injectable VirtualThreadExecutor (name subject to bikeshedding of course) that people can inject and use and it will do the right thing? It is sad that Executors.newVirtualThreadPerTaskExecutor() is not correct (and the way how we rely on the duplicated context for context propagation is rather worrying), but it would be absurd to expect users to do something like ^^^.

mkouba commented 11 months ago

Do we have an injectable VirtualThreadExecutor (name subject to bikeshedding of course) that people can inject and use and it will do the right thing? It is sad that Executors.newVirtualThreadPerTaskExecutor() is not correct (and the way how we rely on the duplicated context for context propagation is rather worrying), but it would be absurd to expect users to do something like ^^^.

It seems that we don't and I agree with Ladislav that we should make it easier for users. Be it an injectable Executor or a util method similar to VertxContextSupport.subscribeAndAwait() and friends.

cescoffier commented 11 months ago

Executors.newVirtualThreadPerTaskExecutor() cannot be modified, and you will have the same behavior one start creating a thread pool.

At the moment, we didn't make the virtual thread executor injectable. It's something we can do.

geoand commented 11 months ago

it would be absurd to expect users to do something like ^^^

I completely agree with this - most of this code make very very little sense to users, so it would be very useful to provide an injectable reference.

Not to mention that users should never ever have any reference to recorders (VirtualThreadsRecorder in this case) in their code.

ozangunalp commented 11 months ago

Not to mention that users should never ever have any reference to recorders (VirtualThreadsRecorder in this case) in their code.

We sure do a bad job at trying to hide recorders...

We can include a producer for the VT executor. I'd suggest a marker interface for easily injecting the VT executor like

@Inject
VirtualThreadsExecutor executor;

but also an identifier qualifier to inject ExecutorService directly:

@Inject
@Identifier("virtual-threads")
ExecutorService executor;

Note that currently Quarkus allows to inject the worker thread pool through @Inject ExecutorService executor;

WDYT?

Ladicek commented 11 months ago

If we go with ExecutorService, I think I'd prefer a regular qualifier (@VirtualThreads maybe?). But all these options are good IMHO.

ozangunalp commented 11 months ago

Here is a draft https://github.com/quarkusio/quarkus/pull/36248

manovotn commented 11 months ago

I agree that supporting @Inject VirtualThreadsExecutor is a good idea :+1:

However, do we need to have two beans for this? I.e. do we need to support a variant for ExecutorService? It suggests that the use cases for those two are interchangeable which may or may not be true based on the workload you put there (that's what I understood anyway, please do correct me there if I am mistaken). The added qualifier is not something users can guess and at that point you might as well just point them towards VirtualThreadsExecutor.

Ladicek commented 11 months ago

Yeah I agree one option should be enough. There's no difference between the two as far as I can tell?

If VirtualThreadsExecutor extends ExecutorService, then that's just about perfect IMHO.

cescoffier commented 11 months ago

ExecutorService .... shutdown is public right? We do NOT want people to call shutdown.

ozangunalp commented 11 months ago

I am pretty sure that one is enough, I just couldn't make it work with both VirtualThreadsExecutor and ExecutorService + qualifier.

ozangunalp commented 11 months ago

ExecutorService .... shutdown is public right? We do NOT want people to call shutdown.

I agree, but on the other hand people can now do @Inject ExecutorService executor; executor.shutdownNow();

cescoffier commented 11 months ago

I would implement the user interface to throw an exception when one of these methods is called, and basically, just have the "submit" method (which would be an Executor).

I would use an existing type (Executor or ExecutorService) with a custom qualifier. The reason is that it makes it easy to integrate with other things (Mutiny emitOn, for example).

mkouba commented 11 months ago

I tend to prefer @VirtualThreads ExecutorService. The problem with VirtualThreadsExecutor is that we would need to register a bean for this specific type only (which is what https://github.com/quarkusio/quarkus/pull/36248 currently does) so that it does not clash with the bean we register for the default blocking ExecutorService. And that seems a bit counterintuitive to me...

ozangunalp commented 11 months ago

Thanks for the feedback, I've updated the https://github.com/quarkusio/quarkus/pull/36248 accordingly

mkouba commented 9 months ago

I'm going to close this one because it's now possible to inject the virtual thread executor and offload the execution on the virtual thread if really needed.

@Willian199 Feel free to reopen if this does not work for you.