micronaut-projects / micronaut-core

Micronaut Application Framework
http://micronaut.io
Apache License 2.0
6.01k stars 1.04k forks source link

Thread pinning in Netty-based Micronaut HTTP Client #10828

Open donbeave opened 1 month ago

donbeave commented 1 month ago

Expected Behavior

No thread pinning.

Actual Behaviour

We are using Micronaut as a parent context for Spring applications, we encountered some thread pinning inside a Netty-based client. Here is a stack trace:

VirtualThread[#2658,tomcat-handler-1491]/runnable@ForkJoinPool-1-worker-2 reason:MONITOR
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:199)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.park(VirtualThread.java:596)
    java.base/java.lang.System$2.parkVirtualThread(System.java:2648)
    java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:54)
    java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:219)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:754)
    java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:990)
    java.base/java.util.concurrent.locks.ReentrantLock$Sync.lock(ReentrantLock.java:153)
    java.base/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:322)
    ch.qos.logback.core.OutputStreamAppender.writeBytes(OutputStreamAppender.java:200)
    ch.qos.logback.core.OutputStreamAppender.writeOut(OutputStreamAppender.java:193)
    ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:237)
    ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)
    ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:85)
    ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
    io.opentelemetry.instrumentation.logback.mdc.v1_0.OpenTelemetryAppender.append(OpenTelemetryAppender.java:111)
    io.opentelemetry.instrumentation.logback.mdc.v1_0.OpenTelemetryAppender.append(OpenTelemetryAppender.java:30)
    ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:85)
    ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)
    ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:272)
    ch.qos.logback.classic.Logger.callAppenders(Logger.java:259)
    ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:426)
    ch.qos.logback.classic.Logger.filterAndLog_2(Logger.java:419)
    ch.qos.logback.classic.Logger.trace(Logger.java:438)
    io.micronaut.http.util.HttpHeadersUtil.trace(HttpHeadersUtil.java:103)
    io.micronaut.http.util.HttpHeadersUtil.lambda$trace$1(HttpHeadersUtil.java:67)
    java.base/java.lang.Iterable.forEach(Iterable.java:75)
    io.micronaut.http.util.HttpHeadersUtil.trace(HttpHeadersUtil.java:67)
    io.micronaut.http.client.netty.DefaultHttpClient.traceRequest(DefaultHttpClient.java:1772)
    io.micronaut.http.client.netty.DefaultHttpClient.sendRequestThroughChannel(DefaultHttpClient.java:1533)
    io.micronaut.http.client.netty.DefaultHttpClient.lambda$exchangeImpl$26(DefaultHttpClient.java:1124)
    reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
    reactor.core.publisher.Flux.subscribe(Flux.java:8840)
    reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:196)
    reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(MonoContextWriteRestoringThreadLocals.java:110)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onNext(FluxContextWriteRestoringThreadLocals.java:118)
    io.micronaut.http.client.netty.CancellableMonoSink.tryForward(CancellableMonoSink.java:69)
    io.micronaut.http.client.netty.CancellableMonoSink.request(CancellableMonoSink.java:146) <== monitors:1
    reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.request(FluxContextWriteRestoringThreadLocals.java:163)
    reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
    reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.request(MonoContextWriteRestoringThreadLocals.java:156)
    reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribe(MonoFlatMapMany.java:141)
    reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(MonoContextWriteRestoringThreadLocals.java:95)
    reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
    reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onSubscribe(FluxContextWriteRestoringThreadLocals.java:104)
    io.micronaut.http.client.netty.CancellableMonoSink.subscribe(CancellableMonoSink.java:62) <== monitors:1
    reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:64)
    reactor.core.publisher.MonoContextWriteRestoringThreadLocals.subscribe(MonoContextWriteRestoringThreadLocals.java:44)
    reactor.core.publisher.Flux.subscribe(Flux.java:8840)
    reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202)
    reactor.core.publisher.FluxSwitchMapNoPrefetch.subscribeOrReturn(FluxSwitchMapNoPrefetch.java:61)
    reactor.core.publisher.Flux.subscribe(Flux.java:8825)
    reactor.core.publisher.Flux.blockFirst(Flux.java:2765)
    io.micronaut.http.client.netty.DefaultHttpClient$1.exchange(DefaultHttpClient.java:571)
    io.micronaut.http.client.BlockingHttpClient.exchange(BlockingHttpClient.java:77)
    io.micronaut.http.client.BlockingHttpClient.exchange(BlockingHttpClient.java:106)
    ***

It's because subscribe method uses synchronized keyword (Here is a source code in the 4.5.x branch: https://github.com/micronaut-projects/micronaut-core/blob/1a8fc61de6ec5f78464415d642dd0227f21bc83d/http-client/src/main/java/io/micronaut/http/client/netty/CancellableMonoSink.java#L57). At this stage of Loom projects synchronized methods can lead to thread pinning, one of the suggestions to replace this keyword with ReentrantLock.

For now, we replaced the Netty-based implementation with the client based on Java HTTP Client (micronaut-http-client-jdk).

Steps To Reproduce

var config = new DefaultHttpClientConfiguration();
config.setFollowRedirects(false);

var httpClient = new DefaultHttpClient((URI) null, config);

var request = HttpRequest.POST(authUrl, body);

var response = httpClient.toBlocking().exchange(request, String.class); // thread will be pinned here

Environment Information

No response

Example Application

No response

Version

4.4.2

graemerocher commented 1 month ago

@yawkat you added some tests to check we are not thread pinning, can you check this case?