vert-x3 / vertx-amqp-client

An AMQP client for Vert.x
Apache License 2.0
17 stars 18 forks source link

Fix wrong dispatching in the AMQP sender #71

Closed cescoffier closed 2 years ago

cescoffier commented 2 years ago

When sending is done from vertx.executeBlocking, the dispatch with a trampoline was executing the action immediately (from the worker thread), which leads to concurrency issues.

Related to https://github.com/smallrye/smallrye-reactive-messaging/issues/1541.

cescoffier commented 2 years ago

Would need to be backported to main (4.2.2)

gemmellr commented 2 years ago

It feels like it should still be running it immediately when it is already on the event loop context, otherwise thats a significant change in the behaviour there.

I'm remembering some code from the old bridge where I checked the thread used, I dare say a nicer version could be done, e.g I see ContextInternal.isRunningOnContext() exists: https://github.com/vert-x3/vertx-amqp-bridge/blob/3.9/src/main/java/io/vertx/amqpbridge/impl/AmqpBridgeImpl.java#L320-

cescoffier commented 2 years ago

@gemmellr It would still say "yes" while you are on a worker thread. When using execute blocking, asking for the context returns the event loop context, not a worker context.

An approach is to use toString on the current thread and check if it's an event loop (and context are equals), but I would not say it's "nicer"

cescoffier commented 2 years ago

My bad "nettyEventLoop().inEventLoop()" should work. I need to check.

cescoffier commented 2 years ago

I confirm, your approach works:

Future<Object> future = vertx.executeBlocking(p -> {
      System.out.println(Vertx.currentContext().isEventLoopContext());
      System.out.println(((ContextInternal) Vertx.currentContext()).nettyEventLoop().inEventLoop());
      p.complete();
    });

=> true and false