quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.58k stars 2.63k forks source link

Imperative Mailer not working with smallrye-graphql #36215

Open tacu90 opened 11 months ago

tacu90 commented 11 months ago

Describe the bug

When trying to send a mail using imperative (blocking) io.quarkus.mailer.Mailer, the thread is getting blocked until the limit is reached and a exception is thrown.

Expected behavior

The email is sent and after completion the mutation is completed.

Actual behavior

Request is loading indefinitely and the following exception is thrown after 60 seconds:

2023-09-29 15:48:05,894 WARN [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-worker-thread-1,5,main] has been blocked for 61715 ms, time limit is 60000 ms: io.vertx.core.VertxException: Thread blocked at java.base@17.0.2/jdk.internal.misc.Unsafe.park(Native Method) at java.base@17.0.2/java.util.concurrent.locks.LockSupport.park(LockSupport.java:211) at java.base@17.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:715) at java.base@17.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1047) at java.base@17.0.2/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:230) at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:67) at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65) at io.smallrye.mutiny.groups.UniAwait.indefinitely(UniAwait.java:46) at io.quarkus.mailer.runtime.BlockingMailerImpl.send(BlockingMailerImpl.java:20) at com.example.MailService.sendMailBlocking(MailService.java:24) at com.example.MailService_ClientProxy.sendMailBlocking(Unknown Source) at com.example.graphql.GraphQLResource.sendMailBlocking(GraphQLResource.java:34) at java.base@17.0.2/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@17.0.2/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.2/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.2/java.lang.reflect.Method.invoke(Method.java:568) at io.smallrye.graphql.execution.datafetcher.helper.ReflectionInvoker.invoke(ReflectionInvoker.java:97) at io.quarkus.smallrye.graphql.runtime.spi.datafetcher.QuarkusDefaultDataFetcher.lambda$invokeAndTransformBlocking$0(QuarkusDefaultDataFetcher.java:84) at io.quarkus.smallrye.graphql.runtime.spi.datafetcher.QuarkusDefaultDataFetcher$$Lambda$1743/0x0000000801528430.call(Unknown Source) at io.smallrye.context.impl.wrappers.SlowContextualCallable.call(SlowContextualCallable.java:21) at io.quarkus.smallrye.graphql.runtime.spi.datafetcher.BlockingHelper.lambda$runBlocking$0(BlockingHelper.java:30) at io.quarkus.smallrye.graphql.runtime.spi.datafetcher.BlockingHelper$$Lambda$1744/0x0000000801528870.handle(Unknown Source) at io.vertx.core.impl.ContextBase.lambda$executeBlocking$1(ContextBase.java:180) at io.vertx.core.impl.ContextBase$$Lambda$1314/0x000000080128c220.handle(Unknown Source) at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:277) at io.vertx.core.impl.ContextBase.lambda$internalExecuteBlocking$2(ContextBase.java:199) at io.vertx.core.impl.ContextBase$$Lambda$1315/0x000000080128c638.run(Unknown Source) at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76) at io.vertx.core.impl.TaskQueue$$Lambda$1277/0x00000008012148c0.run(Unknown Source) at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18) at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513) at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538) at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29) at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base@17.0.2/java.lang.Thread.run(Thread.java:833)

How to Reproduce?

Reproducer: blocktest.zip

Steps to reproduce:

  1. Create new quarkus project with graphql extension
  2. configure application.properties:
    quarkus.mailer.mock=false
    quarkus.mailer.host=**smtpServer**
    quarkus.mailer.port=**port**
    quarkus.mailer.from=mail@mailer.com
  3. Execute mutation "sendMailBlocking" for example with Graphql DEV-UI
    mutation sendMailBlocking {
    sendMailBlocking
    }

Output of uname -a or ver

Microsoft Windows [Version 10.0.19045.3324]

Output of java -version

openjdk version "17.0.2" 2022-01-18

GraalVM version (if different from Java)

No response

Quarkus version or git rev

3.4.1

Build tool (ie. output of mvnw --version or gradlew --version)

maven

Additional information

ReactiveMailer is working (as long as not making use of await().indefinitely() )

Usage of blocking mailer using rest is working without problems.

It seems not to be a problem with await in general, since a simple service using await is working.

quarkus-bot[bot] commented 11 months ago

/cc @Ladicek (smallrye), @jmartisk (graphql,smallrye), @phillip-kruger (graphql,smallrye), @radcortez (smallrye)

phillip-kruger commented 11 months ago

@cescoffier can maybe help here, I think what you see is a warning only. Seems like the request is still executed on the worker thread (blocking) vert.x-worker-thread-1,5,main

If you get the mail to work (i.e the mail actually send) or change mock to true everything works ? I am not sure why the mail is not working.

jmartisk commented 11 months ago

See https://github.com/quarkusio/quarkus/issues/29141 for explanations, I think it's exactly the same issue Is it a problem for you to switch to the reactive mailer?

tacu90 commented 11 months ago

Thanks for your answers.

@phillip-kruger

If you get the mail to work (i.e the mail actually send) or change mock to true everything works ? I am not sure why the mail is not working.

When changing mock to true, it is working without problems (leading to the problem, that we first discovered this error at a late point in test stage :( )

When using the reactive mailer and make use of await().atMost(...) , i still get the error, but the mail is sent after the timeout.

  public Void sendMailReactiveAwaiting10Seconds() {
    Log.info("Sending mail (blocking)");
    return reactiveMailer
      .send(Mail.withText("quarkus@quarkus.io", "Hello", "A message sent from quarkus"))
      .await().atMost(Duration.ofSeconds(10));
  }

@jmartisk

See #29141 for explanations, I think it's exactly the same issue Is it a problem for you to switch to the reactive mailer?

You are right, this seems to be the same issue. Don't know why i wasn't able to find it.

When using the reactive mailer and make use of await() i still get this error (no big surprise as the imperative mailer is doing the just the same). Our application is reading and writing mail information from an oracle db (not using reactive), and our first attempts failed to combine the imperative DB access, reactive mailer and non blocking quarkus request. But maybe this is just because of a lack of knowledge how to do it.

At the moment we have this as a workaround:

// ... do blocking stuff like DB access
send().subscribe().with(
        success -> Log.info("Successfully sent test run protocol notification"),
        failure -> {
          Log.error("Failed to send test run protocol notification: " + failure.getMessage());
        });
// ... do more blocking stuff and return data

This is working, but the request is completed before the mail is actualy sent, so if there are any errors with sending the mail, the user will not know about it.

jmartisk commented 11 months ago

To wait until the mail is sent before returning to the client, you can make the operation return Uni<Void> and then simply do this

return mailer.send().onItemOrFailure().invoke((result, failure) -> {
            if (failure != null) {
                Log.error("Failed to send test run protocol notification: " + failure.getMessage());
            }
        });

This way the error will appear in the response as well as in the server log

tacu90 commented 11 months ago

@jmartisk

i can't get this to work in combination with Hibernate ORM Panache. Short example that reflects our use case:

  @Mutation
  @Transactional
  public Uni<MailEntity> sendMailReactiveWithDB() {
    Log.info("Sending mail (reactive with DB)");
    return mailService.sendMailReactiveWithDB();
  }
  public Uni<MailEntity> sendMailReactiveWithDB() {
    Log.info("Sending mail reactive with DB");
    MailEntity mailEntity = MailEntity.find("topic", "test").firstResult();

    return reactiveMailer.send(Mail.withText(mailEntity.receiver, mailEntity.topic, "A message sent from quarkus"))
      .onItemOrFailure()
      .transform((item, failure) -> {
        if (failure != null) {
          Log.error("Error sending mail", failure);
          return null;
        } else {
          mailEntity.lastSent = LocalDateTime.now();
          return mailEntity;
        }
      });
  }

It's causing an exception because it can't open a transaction:

ERROR [io.sma.graphql] (vert.x-eventloop-thread-1) SRGQL012000: Data Fetching Error: io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread.
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:30)
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(Unknown Source)
    at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:42)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:30)
    at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:27)
    at com.example.graphql.GraphQLResource_Subclass.sendMailReactiveWithDB(Unknown Source)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at io.smallrye.graphql.execution.datafetcher.helper.ReflectionInvoker.invoke(ReflectionInvoker.java:97)
    at io.quarkus.smallrye.graphql.runtime.spi.datafetcher.QuarkusUniDataFetcher.handleUserMethodCallNonBlocking(QuarkusUniDataFetcher.java:44)
    at io.quarkus.smallrye.graphql.runtime.spi.datafetcher.QuarkusUniDataFetcher.handleUserMethodCall(QuarkusUniDataFetcher.java:26)
    at io.quarkus.smallrye.graphql.runtime.spi.datafetcher.AbstractAsyncDataFetcher.invokeAndTransform(AbstractAsyncDataFetcher.java:42)
    at io.smallrye.graphql.execution.datafetcher.AbstractDataFetcher.get(AbstractDataFetcher.java:68)
    at graphql.execution.instrumentation.dataloader.DataLoaderDispatcherInstrumentation.lambda$instrumentDataFetcher$0(DataLoaderDispatcherInstrumentation.java:90)
    at graphql.execution.ExecutionStrategy.invokeDataFetcher(ExecutionStrategy.java:309)
    at graphql.execution.ExecutionStrategy.fetchField(ExecutionStrategy.java:286)
    at graphql.execution.ExecutionStrategy.resolveFieldWithInfo(ExecutionStrategy.java:212)
    at graphql.execution.ExecutionStrategy.resolveField(ExecutionStrategy.java:184)
    at graphql.execution.AsyncSerialExecutionStrategy.lambda$execute$1(AsyncSerialExecutionStrategy.java:47)
    at graphql.execution.Async.eachSequentiallyImpl(Async.java:167)
    at graphql.execution.Async.eachSequentially(Async.java:156)
    at graphql.execution.AsyncSerialExecutionStrategy.execute(AsyncSerialExecutionStrategy.java:42)
    at graphql.execution.Execution.executeOperation(Execution.java:161)
    at graphql.execution.Execution.execute(Execution.java:104)
    at graphql.GraphQL.execute(GraphQL.java:557)
    at graphql.GraphQL.lambda$parseValidateAndExecute$11(GraphQL.java:476)
    at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
    at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
    at graphql.GraphQL.parseValidateAndExecute(GraphQL.java:471)
    at graphql.GraphQL.executeAsync(GraphQL.java:439)
    at io.smallrye.graphql.execution.ExecutionService.lambda$writeAsync$0(ExecutionService.java:216)
    at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:24)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:51)
    at io.smallrye.mutiny.groups.UniSubscribe.with(UniSubscribe.java:110)
    at io.smallrye.mutiny.groups.UniSubscribe.with(UniSubscribe.java:88)
    at io.smallrye.graphql.execution.ExecutionService.writeAsync(ExecutionService.java:218)
    at io.smallrye.graphql.execution.ExecutionService.execute(ExecutionService.java:185)
    at io.smallrye.graphql.execution.ExecutionService.executeAsync(ExecutionService.java:121)
    at io.quarkus.smallrye.graphql.runtime.SmallRyeGraphQLExecutionHandler.doRequest(SmallRyeGraphQLExecutionHandler.java:365)
    at io.quarkus.smallrye.graphql.runtime.SmallRyeGraphQLExecutionHandler.handlePost(SmallRyeGraphQLExecutionHandler.java:124)
    at io.quarkus.smallrye.graphql.runtime.SmallRyeGraphQLExecutionHandler.doHandle(SmallRyeGraphQLExecutionHandler.java:85)
    at io.quarkus.smallrye.graphql.runtime.SmallRyeGraphQLAbstractHandler.handleWithIdentity(SmallRyeGraphQLAbstractHandler.java:95)
    at io.quarkus.smallrye.graphql.runtime.SmallRyeGraphQLAbstractHandler.handle(SmallRyeGraphQLAbstractHandler.java:76)
    at io.quarkus.smallrye.graphql.runtime.SmallRyeGraphQLAbstractHandler.handle(SmallRyeGraphQLAbstractHandler.java:30)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1286)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:140)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:144)
    at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.doEnd(BodyHandlerImpl.java:359)
    at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.end(BodyHandlerImpl.java:336)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:277)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:259)
    at io.vertx.core.http.impl.HttpEventHandler.handleEnd(HttpEventHandler.java:76)
    at io.vertx.core.http.impl.Http1xServerRequest.onEnd(Http1xServerRequest.java:595)
    at io.vertx.core.http.impl.Http1xServerRequest.lambda$pendingQueue$1(Http1xServerRequest.java:133)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
    at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:242)
    at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:295)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:277)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:259)
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

I've updated the reproducer:

blocktest.zip

jmartisk commented 11 months ago

I think the @Transactional annotation is for blocking operations only, try using io.quarkus.hibernate.reactive.panache.common.WithTransaction instead

tacu90 commented 11 months ago

I think the @Transactional annotation is for blocking operations only, try using io.quarkus.hibernate.reactive.panache.common.WithTransaction instead

To use @WithTransaction, I would have to use "quarkus-hibernate-reactive-panache", which, to my knowledge, does not work in conjunction with "quarkus-hibernate-orm-panache". So, I would have to restructure our entire database implementation to be reactive. We've had issues with this in the past, which is why we chose the "classical" approach. It's not an option to rebuild this aspect of our application just for sending emails.

We will probably stick with our workaround for now, or possibly use REST for this function.

However, I would appreciate a fix, as we enjoy working with GraphQL and the mailer has been performing quite well otherwise. If a fix isn't immediately possible, it would be helpful to have a warning for this combination that also appears when using "quarkus.mailer.mock=true". That way, we could identify the issue early on.

gsmet commented 11 months ago

I concur that we need a way to have a fully working blocking stack with GraphQL, mailer and Hibernate ORM. We shouldn't force users to switch to reactive.

/cc @cescoffier

cescoffier commented 11 months ago

Mailer is particularly hard to get working in this case, because of how the SMTP connection is handled. I will need to see if there is a way to workaround the issue. Unfortunately, I cannot tell you when (the task is in my todo list for a very long time)

treo commented 4 months ago

I've struggled for a few hours to resolve this issue today, too. (Quarkus 3.9.5)

The following is a sketch of what worked for me:

@Mutation
@Description("Example Endpoint with some blocking action before sending a mail")
public Uni<Void> exampleMailer(){
  return Uni.createFrom().item(0)
    .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
    .onItem()
    .transform(ignored -> { /* blocking code here */})
    .flatMap(blockingResult -> {
        /* use blocking result to create mail */
        return reactiveMailer.send(/* ... */)
    });
}

I've tried the idea of moving the blocking part away from the executor thread in a few permutations, in particular Uni.createFrom().emitter(...) but I always ran into BlockingOperationNotAllowedException, and when I added the @Blocking annotation to the method, it would get stuck.

The only thing that has worked in the end is to start with a uni created from an item (which can be null or any other thing that can be created before entering this flow), and then immediately move it to the worker pool to do the blocking work as well as the mailing.