quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.58k stars 2.63k forks source link

Resteasy Reactive with reactive hangs when using with azure blob store extension #37299

Open mswiderski opened 10 months ago

mswiderski commented 10 months ago

Describe the bug

When combining rest easy reactive (using CompletionStage or via reactive messaging) with azure blob store extension (which is actually tiny wrapper on top of azure java client) requests hang and never complete.

This seems to be related to mixing quarkus reactive stack with reactor (that is used within azure client library).

2023-11-24 12:14:49,463 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.red by Quarkus 3.5.3) started in 4.096s. Listening on: http://localhost:8080
2023-11-24 12:14:49,463 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [azure-storage-blob, cdi, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-reactive-messaging, vertx]
2023-11-24 12:14:57,269 WARN  [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3257 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
    at java.base@17.0.2/jdk.internal.misc.Unsafe.park(Native Method)
    at java.base@17.0.2/java.util.concurrent.locks.LockSupport.park(LockSupport.java:211)
    at java.base@17.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:715)
    at java.base@17.0.2/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1047)
    at java.base@17.0.2/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:230)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
    at reactor.core.publisher.Mono.block(Mono.java:1742)
    at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:146)
    at com.azure.storage.blob.BlobContainerClient.createIfNotExistsWithResponse(BlobContainerClient.java:374)
    at com.azure.storage.blob.BlobServiceClient.createBlobContainerIfNotExistsWithResponse(BlobServiceClient.java:210)
    at com.azure.storage.blob.BlobServiceClient.createBlobContainerIfNotExists(BlobServiceClient.java:172)
    at TestResource.lambda$checkExistence$0(TestResource.java:34)
    at TestResource$$Lambda$1308/0x00000008012b1cd8.apply(Unknown Source)
    at java.base@17.0.2/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684)
    at java.base@17.0.2/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
    at java.base@17.0.2/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168)
    at java.base@17.0.2/java.util.concurrent.CompletableFuture$MinimalStage.thenApply(CompletableFuture.java:2902)
    at TestResource.checkExistence(TestResource.java:32)
    at TestResource$quarkusrestinvoker$checkExistence_97d8ae0b717d419d2c2049ee33b1b138b6c176e1.invoke(Unknown Source)
    at org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29)
    at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:141)
    at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)
    at org.jboss.resteasy.reactive.server.handlers.RestInitialHandler.beginProcessing(RestInitialHandler.java:48)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:23)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:10)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1286)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:177)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:144)
    at io.quarkus.vertx.http.runtime.options.HttpServerCommonHandlers$1.handle(HttpServerCommonHandlers.java:59)
    at io.quarkus.vertx.http.runtime.options.HttpServerCommonHandlers$1.handle(HttpServerCommonHandlers.java:37)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1286)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:177)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:144)
    at io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveRecorder$13.handle(ResteasyReactiveRecorder.java:339)
    at io.quarkus.resteasy.reactive.server.runtime.ResteasyReactiveRecorder$13.handle(ResteasyReactiveRecorder.java:332)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1286)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:177)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:144)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$4.handle(VertxHttpHotReplacementSetup.java:192)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$4.handle(VertxHttpHotReplacementSetup.java:181)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
    at io.vertx.core.impl.future.FutureBase$$Lambda$1204/0x0000000801358238.run(Unknown Source)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base@17.0.2/java.lang.Thread.run(Thread.java:833)

Expected behavior

Request is processed without any hanging

Actual behavior

Request is hung and never finishes

How to Reproduce?

Reproducer

https://github.com/mswiderski/quarkus-azure-reactive-issue

  1. start the reproducer in dev mode
  2. send following command curl http://localhost:8080/test/123

curl is blocked and in quarkus logs you will see thread blocked exceptions that never ends, in addition the app cannot be stopped in normal way, only killed

Output of uname -a or ver

MacBook-Pro.local 23.1.0 Darwin Kernel Version 23.1.0

Output of java -version

Java version: 17.0.2, vendor: Oracle Corporation

Quarkus version or git rev

3.5.3

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.8.4

Additional information

No response

quarkus-bot[bot] commented 10 months ago

/cc @FroMage (resteasy-reactive), @geoand (resteasy-reactive), @stuartwdouglas (resteasy-reactive)

mswiderski commented 10 months ago

on top of this, very similar issue happens when using azure client from method annotated with @ServerRequestFilter.

Regardless of what is the rest endpoint method signature, the filter is always executed on an event loop thread.

geoand commented 9 months ago

This seems to be related to mixing quarkus reactive stack with reactor (that is used within azure client library).

@cescoffier would love to hear about this one :)

Jokes aside, I'll have to take a closer look at some point.

mswiderski commented 9 months ago

@geoand believe me I tried hard to debug it but failed miserably :( couldn't figure out why it locks in such a hard way that even stopping the app is problematic. In Kubernetes it was even worse as it used whole grace period to shutdown and then killed it so not fun at all.

Adding the code into the separate thread is current work around but certainly not going to fly for long

Thanks for looking into it (whenever you find some time) and if you need anything from me just ping me here.

geoand commented 9 months ago

Thanks for the update.

Doesn't sound fun at all...

cescoffier commented 9 months ago

At some point, the code is blocking (blockingGet method in the stack trace), which is not allowed, as it blocks the event loop. Looking at https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobContainerClient.java, all the methods are synchronous and blocking. You cannot use that class in a pure reactive application.

I do not know if there is a reactive version of the BlobContainerClient, but you cannot use the blocking variant when running on the event loop. You would need to switch to synchronous methods and use @Blocking

mswiderski commented 9 months ago

the thing is that @Blocking does not help, putting it on the rest method or the listener method does not change a thing.

I will also look into the azure client to see if there is any reactive version of the client.

cescoffier commented 9 months ago

@mswiderski you need:


   @Channel("test") MutinyEmitter<String> emitter;

   @Path("/{id}")
   @GET
   public Response checkExistence(@PathParam("id") String id) {
        emitter.sendAndAwait(id);      
        BlobContainerClient blobContainerClient = blobServiceClient
                            .createBlobContainerIfNotExists("azure-storage-blob");
         BlobClient blobClient = blobContainerClient.getBlobClient(id);
          if (blobClient.exists()) {
              return Response.status(Response.Status.OK).build();
          }
          return Response.status(Response.Status.NOT_FOUND).build();
    }
mswiderski commented 9 months ago

thanks @cescoffier this certainly helps. On the listener side (that uses @incoming) there is also a need to use @Blocking annotation whenever there is a need to use azure client.

With this I would say we have something that works. And if I understood comments right, there is no way to use azure blob store with rest easy reactive as it blocks the event loop. Maybe it would make sense to update the docs in the QuickStart? If so I am more than happy to make a small PR with a note of that.

cescoffier commented 9 months ago

That would be awesome! Thanks!

mswiderski commented 9 months ago

PR with docs in azure-services created. https://github.com/quarkiverse/quarkus-azure-services/pull/179

Is there any place in Quarkus docs that would be a good place to add such a note too?

geoand commented 9 months ago

I can't think of any good place...

krisztiankocsis commented 4 months ago

I have similar problems from a background thread. Is Azure Blob Store even blocking the event loop thread if I used BlobClient from a manually created (Thread.start()) thread?

krisztiankocsis commented 4 months ago

Is there a workaround exist?

krisztiankocsis commented 4 months ago

I'm getting this seemingly unrelated exception:

{"timestamp":"2024-04-24T16:54:52.27784172+02:00","sequence":4598,"loggerName":"io.vertx.core.impl.BlockedThreadChecker","level":"WARN","message":"Thread Thread[vert.x-eventloop-thread-3,5,main] has been blocked for 19030 ms, time limit is 2000 ms","threadName":"vertx-blocked-thread-checker","threadId":33,"mdc":{},"ndc":"","exception":{"refId":1,"exceptionType":"io.vertx.core.VertxException","message":"Thread blocked","frames":[{"class":"io.quarkus.oidc.runtime.OidcAuthenticationMechanism$3","method":"apply","line":82},{"class":"io.quarkus.oidc.runtime.OidcAuthenticationMechanism$3","method":"apply","line":76},{"class":"io.smallrye.context.impl.wrappers.SlowContextualFunction","method":"apply","line":21},{"class":"io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor","method":"onItem","line":36},{"class":"io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor","method":"onItem","line":43},{"class":"io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription","method":"forward","line":38},{"class":"io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem","method":"subscribe","line":23},{"class":"io.smallrye.mutiny.operators.AbstractUni","method":"subscribe","line":36},{"class":"io.smallrye.mutiny.operators.uni.UniOnItemTransform","method":"subscribe","line":22},{"class":"io.smallrye.mutiny.operators.AbstractUni","method":"subscribe","line":36},{"class":"io.smallrye.mutiny.operators.uni.UniOnItemTransform","method":"subscribe","line":22},{"class":"io.smallrye.mutiny.operators.AbstractUni","method":"subscribe","line":36},{"class":"io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni","method":"subscribe","line":25},{"class":"io.smallrye.mutiny.operators.AbstractUni","method":"subscribe","line":36},{"class":"io.smallrye.mutiny.groups.UniSubscribe","method":"withSubscriber","line":51},{"class":"io.smallrye.mutiny.operators.uni.UniMemoizeOp","method":"subscribe","line":59},{"class":"io.smallrye.mutiny.operators.AbstractUni","method":"subscribe","line":36},{"class":"io.smallrye.mutiny.groups.UniSubscribe","method":"withSubscriber","line":51},{"class":"io.quarkus.vertx.http.runtime.security.HttpSecurityRecorder$AbstractAuthenticationHandler","method":"handle","line":276},{"class":"io.quarkus.vertx.http.runtime.security.HttpSecurityRecorder$AbstractAuthenticationHandler","method":"handle","line":224},{"class":"io.vertx.ext.web.impl.RouteState","method":"handleContext","line":1285},{"class":"io.vertx.ext.web.impl.RoutingContextImplBase","method":"iterateNext","line":177},{"class":"io.vertx.ext.web.impl.RoutingContextWrapper","method":"next","line":200},{"class":"io.quarkus.vertx.http.runtime.cors.CORSFilter","method":"handle","line":138},{"class":"io.quarkus.vertx.http.runtime.cors.CORSFilter","method":"handle","line":21},{"class":"io.vertx.ext.web.impl.RouteState","method":"handleContext","line":1285},{"class":"io.vertx.ext.web.impl.RoutingContextImplBase","method":"iterateNext","line":177},{"class":"io.vertx.ext.web.impl.RoutingContextWrapper","method":"next","line":200},{"class":"io.quarkus.vertx.http.runtime.VertxHttpRecorder$8","method":"handle","line":501},{"class":"io.quarkus.vertx.http.runtime.VertxHttpRecorder$8","method":"handle","line":497},{"class":"io.vertx.ext.web.impl.RouteState","method":"handleContext","line":1285},{"class":"io.vertx.ext.web.impl.RoutingContextImplBase","method":"iterateNext","line":177},{"class":"io.vertx.ext.web.impl.RoutingContextWrapper","method":"next","line":200},{"class":"io.vertx.ext.web.impl.RouterImpl","method":"handleContext","line":250},{"class":"io.vertx.ext.web.impl.RouteState","method":"handleContext","line":1285},{"class":"io.vertx.ext.web.impl.RoutingContextImplBase","method":"iterateNext","line":177},{"class":"io.vertx.ext.web.impl.RoutingContextImpl","method":"next","line":137},{"class":"io.vertx.ext.web.impl.RouterImpl","method":"handle","line":68},{"class":"io.vertx.ext.web.impl.RouterImpl","method":"handle","line":37},{"class":"io.quarkus.vertx.http.runtime.options.HttpServerCommonHandlers$3","method":"handle","line":103},{"class":"io.quarkus.vertx.http.runtime.options.HttpServerCommonHandlers$3","method":"handle","line":100},{"class":"io.quarkus.vertx.http.runtime.VertxHttpRecorder$7","method":"handle","line":489},{"class":"io.quarkus.vertx.http.runtime.VertxHttpRecorder$7","method":"handle","line":486},{"class":"io.quarkus.vertx.http.runtime.options.HttpServerCommonHandlers$2","method":"handle","line":86},{"class":"io.quarkus.vertx.http.runtime.options.HttpServerCommonHandlers$2","method":"handle","line":69},{"class":"io.quarkus.vertx.http.runtime.VertxHttpRecorder$1","method":"handle","line":147},{"class":"io.quarkus.vertx.http.runtime.VertxHttpRecorder$1","method":"handle","line":123},{"class":"io.vertx.core.impl.ContextImpl","method":"emit","line":335},{"class":"io.vertx.core.impl.DuplicatedContext","method":"emit","line":176},{"class":"io.vertx.core.http.impl.Http1xServerConnection","method":"handleMessage","line":174},{"class":"io.vertx.core.net.impl.ConnectionBase","method":"read","line":159},{"class":"io.vertx.core.net.impl.VertxHandler","method":"channelRead","line":153},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":442},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"fireChannelRead","line":412},{"class":"io.vertx.core.http.impl.Http1xUpgradeToH2CHandler","method":"channelRead","line":124},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":444},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"fireChannelRead","line":412},{"class":"io.netty.handler.timeout.IdleStateHandler","method":"channelRead","line":289},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":442},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"fireChannelRead","line":412},{"class":"io.netty.handler.codec.MessageToMessageDecoder","method":"channelRead","line":103},{"class":"io.netty.handler.codec.MessageToMessageCodec","method":"channelRead","line":111},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":442},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"fireChannelRead","line":412},{"class":"io.netty.handler.codec.MessageToMessageDecoder","method":"channelRead","line":103},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":444},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"fireChannelRead","line":412},{"class":"io.netty.handler.codec.ByteToMessageDecoder","method":"fireChannelRead","line":346},{"class":"io.netty.handler.codec.ByteToMessageDecoder","method":"channelRead","line":318},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":444},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"fireChannelRead","line":412},{"class":"io.vertx.core.http.impl.Http1xOrH2CHandler","method":"end","line":61},{"class":"io.vertx.core.http.impl.Http1xOrH2CHandler","method":"channelRead","line":38},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":444},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"fireChannelRead","line":412},{"class":"io.netty.handler.timeout.IdleStateHandler","method":"channelRead","line":289},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":442},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"fireChannelRead","line":412},{"class":"io.netty.channel.DefaultChannelPipeline$HeadContext","method":"channelRead","line":1410},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":440},{"class":"io.netty.channel.AbstractChannelHandlerContext","method":"invokeChannelRead","line":420},{"class":"io.netty.channel.DefaultChannelPipeline","method":"fireChannelRead","line":919},{"class":"io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe","method":"read","line":166},{"class":"io.netty.channel.nio.NioEventLoop","method":"processSelectedKey","line":788},{"class":"io.netty.channel.nio.NioEventLoop","method":"processSelectedKeysOptimized","line":724},{"class":"io.netty.channel.nio.NioEventLoop","method":"processSelectedKeys","line":650},{"class":"io.netty.channel.nio.NioEventLoop","method":"run","line":562},{"class":"io.netty.util.concurrent.SingleThreadEventExecutor$4","method":"run","line":997},{"class":"io.netty.util.internal.ThreadExecutorMap$2","method":"run","line":74},{"class":"io.netty.util.concurrent.FastThreadLocalRunnable","method":"run","line":30},{"class":"java.lang.Thread","method":"run","line":1583}]}}

but only while I'm reading Azure Store blobs from background thread (not immediatelly, after 1-2 mins).

krisztiankocsis commented 4 months ago

As we are using blob store to store intermediate data during processing, this is vital for us to function.