quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.87k stars 2.71k forks source link

Bug using event bus(quarkus-vertx) and quarkus-cache #25763

Closed diogocarleto closed 9 months ago

diogocarleto commented 2 years ago

Describe the bug

I'm trying to upgrade the Quarkus version from 2.5.3 to 2.9.1, and I'm experiencing a problem with quarkus-vertx + quarkus-cache.

Let me explain.

I created my GreetingService with the following code:

@ApplicationScoped
public class GreetingService {

    private static final String HELLO_WITH_CACHE = "hello-with-cache";

    @Inject
    EventBus eventBus;

    public void hello() {
        eventBus.send(HELLO_WITH_CACHE,"testing");
    }

    @ConsumeEvent(HELLO_WITH_CACHE)
    public void consumeHello(String value) {
        System.out.println(value + " incoming");
        String s1 = cachedList().stream()
                .filter(s -> s.equals("i_1000"))
                .findFirst().get();

        System.out.println(s1);
    }

    @CacheResult(cacheName = "myCachedList")
    public List<String> cachedList() {
        return IntStream.range(0, 10000)
                .mapToObj(i -> "i_"+i)
                .collect(Collectors.toList());
    }
}

Basically hello send a message to consumeHello that tries to get some data from another method that has cache, in this case cachedList. Just after I called this, I receive an exception.

Expected behavior

No response

Actual behavior

2022-05-24 15:06:26,089 ERROR [io.qua.ver.cor.run.VertxCoreRecorder] (vert.x-eventloop-thread-0) Uncaught exception received by Vert.x: java.lang.IllegalStateException: The current thread cannot be blocked: vert.x-eventloop-thread-0
    at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:30)
    at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
    at io.smallrye.mutiny.groups.UniAwait.indefinitely(UniAwait.java:46)
    at io.quarkus.cache.runtime.CacheResultInterceptor.intercept(CacheResultInterceptor.java:116)
    at io.quarkus.cache.runtime.CacheResultInterceptor_Bean.intercept(Unknown Source)
    at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
    at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
    at org.acme.GreetingService_Subclass.cachedList(Unknown Source)
    at org.acme.GreetingService.consumeHello(GreetingService.java:28)
    at org.acme.GreetingService_ClientProxy.consumeHello(Unknown Source)
    at org.acme.GreetingService_VertxInvoker_consumeHello_bb8009b16975814a83732605f11255445f5beee2.invokeBean(Unknown Source)
    at io.quarkus.vertx.runtime.EventConsumerInvoker.invoke(EventConsumerInvoker.java:41)
    at io.quarkus.vertx.runtime.VertxRecorder$3$1.handle(VertxRecorder.java:130)
    at io.quarkus.vertx.runtime.VertxRecorder$3$1.handle(VertxRecorder.java:100)
    at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
    at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:168)
    at io.vertx.core.eventbus.impl.MessageConsumerImpl.dispatch(MessageConsumerImpl.java:177)
    at io.vertx.core.eventbus.impl.HandlerRegistration$InboundDeliveryContext.next(HandlerRegistration.java:169)
    at io.vertx.core.eventbus.impl.HandlerRegistration$InboundDeliveryContext.dispatch(HandlerRegistration.java:134)
    at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:111)
    at io.vertx.core.eventbus.impl.HandlerRegistration.dispatch(HandlerRegistration.java:105)
    at io.vertx.core.eventbus.impl.MessageConsumerImpl.deliver(MessageConsumerImpl.java:183)
    at io.vertx.core.eventbus.impl.MessageConsumerImpl.doReceive(MessageConsumerImpl.java:168)
    at io.vertx.core.eventbus.impl.HandlerRegistration.lambda$receive$0(HandlerRegistration.java:56)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

finish
2022-05-24 15:06:31,127 INFO  [io.quarkus] (main) code-with-quarkus stopped in 0.033s

Process finished with exit code 0

How to Reproduce?

Just download my reproducer, and run the test GreetingServiceTest

Output of uname -a or ver

No response

Output of java -version

No response

GraalVM version (if different from Java)

No response

Quarkus version or git rev

No response

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 2 years ago

/cc @gwenneg

gwenneg commented 2 years ago

Hi @diogocarleto. There are two approaches to solve that issue:

▶️ Approach 1: declare the method that consumes the event as blocking:

This is simpler if you're not familiar with reactive code.

    @ConsumeEvent(value = HELLO_WITH_CACHE, blocking = true) // You can do it with this @ConsumeEvent field
    public void consumeHello(String value) {
        // Same body as before
    }

or

    @ConsumeEvent(HELLO_WITH_CACHE)
    @io.smallrye.common.annotation.Blocking // Or do it with this annotation, the outcome will be identical
    public void consumeHello(String value) {
        // Same body as before
    }

should work.

▶️ Approach 2: make the entire code reactive:

    @ConsumeEvent(HELLO_WITH_CACHE)
    public Uni<Void> consumeHello(String value) {
        System.out.println(value + " incoming");
        // Quick and dirty untested code, the real code would probably not look like that.
        return cachedList()
            .onItem().invoke(list -> {
                String s1 = list.stream()
                        .filter(s -> s.equals("i_1000"))
                        .findFirst().get();
                System.out.println(s1);
            }).replaceWithVoid();
    }

    @CacheResult(cacheName = "myCachedList")
    public Uni<List<String>> cachedList() { // quarkus-cache will run in a non-blocking way because the method returns Uni
        return Uni.createFrom().item(() -> {
            return IntStream.range(0, 10000)
                    .mapToObj(i -> "i_"+i)
                    .collect(Collectors.toList());
        });
    }

This should also work with CompletionStage in place of Uni.

gwenneg commented 2 years ago

I'll try to update the quarkus-cache doc soon and add examples about this, thanks for reporting it!

diogocarleto commented 2 years ago

Thanks for your clarification. I can update the documentation also if you want.

Best,

cescoffier commented 9 months ago

See comment.