micronaut-projects / micronaut-core

Micronaut Application Framework
http://micronaut.io
Apache License 2.0
6k stars 1.04k forks source link

Connection closed before response was received on concurrent requests #10882

Closed BharathMC closed 4 weeks ago

BharathMC commented 4 weeks ago

Expected Behavior

Concurrent requests to micronaut service should work fine without any issues.

Actual Behaviour

I am facing issue with concurrent requests hitting micronaut service.

To simplify, I have made this application: I have exposed a GET method REST api, which calls external service REST API and returns HTTP 200.

When I run these in sequential, it works fine. But, when I run in concurrent fashion with just 2 threads, it fails with timeout exception after 200 requests.

Note: endpoint (health) API is available and no issue on that.

Steps To Reproduce

Resource class

@Controller("/test")
@RequestScope
public class TestResource {

    @Inject
    @Client("client1")
    HttpClient myLocalhttpClient;

    @Get(uri = "/http", produces = MediaType.TEXT_PLAIN)
    public HttpResponse<?> testHttpClient() {
        String requestId = String.valueOf(UUID.randomUUID());
        System.out.println("received request "+requestId);
        String baseUri = "http://localhost:8082/health";

        URI builder = UriBuilder.of(baseUri).build();

        HttpRequest<?> request = HttpRequest.GET(baseUri)
                .accept(MediaType.APPLICATION_JSON_TYPE)
                .contentType(MediaType.APPLICATION_JSON_TYPE)
                .uri(builder);

        HttpResponse<String> response = myLocalhttpClient.toBlocking().exchange(request, String.class);
        String resBody = response.getBody().isPresent() ? response.getBody().get() : null;
        HttpStatus status = response.status();

        System.out.println("Response status= " + status);
        System.out.println("Response Body= " + resBody);
        System.out.println("received completed "+requestId);
        return HttpResponse.ok();
    }
}

Filter class

@ClientFilter(value = Filter.MATCH_ALL_PATTERN, serviceId = "client1")
public class MyClientFilter {

    public MyClientFilter() {

    }

    @RequestFilter
    @ExecuteOn(TaskExecutors.BLOCKING)
    public void filterRequest(MutableHttpRequest<?> request) {
        System.out.println("MicroTxLraClientInterceptor: Inside MyClientFilter interceptor "+ request.getPath());
    }
}

application.properties

micronaut.http.services.client1.url=http://localhost
micronaut.http.services.client1.log-level=DEBUG
micronaut.http.services.client1.read-timeout=180s
micronaut.http.services.client1.read-idle-timeout=120s
micronaut.http.services.client1.connect-timeout=120s
micronaut.http.services.client1.num-of-threads=25

micronaut.http.services.client1.pool.enabled=true
micronaut.http.services.client1.pool.max-pending-acquires=128
micronaut.http.services.client1.pool.acquire-timeout=3m
micronaut.http.services.client1.pool.max-concurrent-http1-connections=512
micronaut.http.services.client1.pool.max-pending-connections=64

Exception after some requests (about 200 requests):

2024-06-04 16:52:29 [io-executor-thread-2] ERROR i.m.http.server.RouteExecutor - Unexpected error occurred: Connection closed before response was received
io.micronaut.http.client.exceptions.ResponseClosedException: Connection closed before response was received
    at io.micronaut.http.client.netty.DefaultHttpClient$BaseHttpResponseHandler.channelInactive(DefaultHttpClient.java:2073)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.codec.MessageAggregator.channelInactive(MessageAggregator.java:441)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
    at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:328)
    at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.micronaut.http.client.netty.ConnectionManager$Pool$ConnectionHolder$3.channelInactive(ConnectionManager.java:1139)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
    at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:206)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:840)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103)
        at reactor.core.publisher.Flux.blockFirst(Flux.java:2704)
        at io.micronaut.http.client.netty.DefaultHttpClient$1.exchange(DefaultHttpClient.java:570)
        at io.micronaut.http.client.BlockingHttpClient.exchange(BlockingHttpClient.java:77)
        at io.micronaut.http.client.BlockingHttpClient.exchange(BlockingHttpClient.java:106)

Exception happens at

HttpResponse<String> response = myLocalhttpClient.toBlocking().exchange(request, String.class);

and

2024-06-04 16:55:29 [io-executor-thread-3] ERROR i.m.http.server.RouteExecutor - Unexpected error occurred: Read Timeout
io.micronaut.http.client.exceptions.ReadTimeoutException: Read Timeout
    at io.micronaut.http.client.exceptions.ReadTimeoutException.<clinit>(ReadTimeoutException.java:26)
    at io.micronaut.http.client.netty.DefaultHttpClient.lambda$exchangeImpl$28(DefaultHttpClient.java:1155)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
    at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:295)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:280)
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:419)
    at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:271)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:286)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
...
2024-06-04 16:55:50 [default-nioEventLoopGroup-1-11] WARN  i.n.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.timeout.ReadTimeoutException: null

Post stopping the incoming requests, these logs are printed continuously

2024-06-04 16:55:50 [default-nioEventLoopGroup-1-11] WARN  i.n.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.timeout.ReadTimeoutException: null

Thanks

Environment Information

JDK 17

Example Application

No response

Version

4.2.1

graemerocher commented 4 weeks ago

You are blocking the event loop. What you are doing is actually an exception earlier in the latest version of Micronaut. If you intention is to do primarily blocking operations you need to configure the client & server to handle that case. Here is a recommended configuration:

# We assume primarily blocking user code, so configure the server & client to handle blocking I/O
micronaut.server.thread-selection=blocking
micronaut.netty.event-loops.http-client-group.executor=blocking
micronaut.netty.event-loops.http-client-group.prefer-native-transport=true
micronaut.http.client.event-loop-group=http-client-group
micronaut.http.client.pool.enabled=true