micronaut-projects / micronaut-core

Micronaut Application Framework
http://micronaut.io
Apache License 2.0
6.04k stars 1.06k forks source link

JdkBlockingHttpClient throw java.net.ConnectException: Address already in use: getsockopt with concurency 50 #10829

Open pm3 opened 4 months ago

pm3 commented 4 months ago

Expected Behavior

I expect that when using JdkBlockingHttpClient the blocked call httpClient.send is used and not the asynchronous call httpClient.sendAsync. In this case, the entire request will be executed in the blocked thread and will not try to execute it in the HttpClient's default executor.

The change is simple, in AbstractJdkHttpClient you need to extract the call http.sendAsync to a separate method and replace it with a synchronous call in JdkBlockingHttpClient.

in AbstractJdkHttpClient

    protected CompletableFuture<java.net.http.HttpResponse<byte[]>> netClientSendAsync(HttpRequest httpRequest) {
        return client.sendAsync(httpRequest, java.net.http.HttpResponse.BodyHandlers.ofByteArray());
    }

and in JdkBlockingHttpClient

    @Override
    protected CompletableFuture<java.net.http.HttpResponse<byte[]>> netClientSendAsync(HttpRequest httpRequest) {
        CompletableFuture<java.net.http.HttpResponse<byte[]>> future = new CompletableFuture<>();
        try{
            future.complete(client.send(httpRequest, java.net.http.HttpResponse.BodyHandlers.ofByteArray()));
        }catch (Exception e){
            future.completeExceptionally(e);
        }
        return future;
    }

After this change, JdkBlockingHttpClient works even with a large number of requests and a large concurrency. Plus the response time of individual calls has sped up.

Actual Behaviour

I often write controller methods that I call backend service. We use java 21 with virtual threads so we can write blocked code. I get the HttpClient exception when using the JDK at high concurrency:

10:49:21.497 [virtual-executor13205] ERROR i.m.http.server.RouteExecutor - Unexpected error occurred: Error sending request: Address already in use: getsockopt io.micronaut.http.client.exceptions.HttpClientException: Error sending request: Address already in use: getsockopt at io.micronaut.http.client.jdk.AbstractJdkHttpClient.lambda$responsePublisher$11(AbstractJdkHttpClient.java:403) at reactor.core.publisher.Flux.lambda$onErrorMap$28(Flux.java:7236) at reactor.core.publisher.Flux.lambda$onErrorResume$29(Flux.java:7289) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:843) at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:609) at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:589) at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:864) at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:991) at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:119) at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply(MonoCompletionStage.java:71) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:844) at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188) Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103) at reactor.core.publisher.Flux.blockFirst(Flux.java:2704) at io.micronaut.http.client.jdk.JdkBlockingHttpClient.exchange(JdkBlockingHttpClient.java:93) at io.micronaut.http.client.BlockingHttpClient.retrieve(BlockingHttpClient.java:140) at io.micronaut.http.client.interceptor.HttpClientIntroductionAdvice.lambda$handleSynchronous$3(HttpClientIntroductionAdvice.java:242) at io.micronaut.http.client.interceptor.HttpClientIntroductionAdvice.handleBlockingCall(HttpClientIntroductionAdvice.java:620) at io.micronaut.http.client.interceptor.HttpClientIntroductionAdvice.handleSynchronous(HttpClientIntroductionAdvice.java:241) at io.micronaut.http.client.interceptor.HttpClientIntroductionAdvice.intercept(HttpClientIntroductionAdvice.java:195) at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:138) at com.example.MyClient$Intercepted.text2(Unknown Source) at com.example.MyController.text(MyController.java:31) at com.example.$MyController$Definition$Exec.dispatch(Unknown Source) at io.micronaut.context.AbstractExecutableMethodsDefinition$DispatchedExecutableMethod.invokeUnsafe(AbstractExecutableMethodsDefinition.java:461) at io.micronaut.context.DefaultBeanContext$BeanContextUnsafeExecutionHandle.invokeUnsafe(DefaultBeanContext.java:4232) at io.micronaut.web.router.AbstractRouteMatch.execute(AbstractRouteMatch.java:230) at io.micronaut.http.server.RouteExecutor.executeRouteAndConvertBody(RouteExecutor.java:488) at io.micronaut.http.server.RouteExecutor.lambda$callRoute$6(RouteExecutor.java:465) at io.micronaut.core.execution.ExecutionFlow.lambda$async$1(ExecutionFlow.java:87) at io.micronaut.core.propagation.PropagatedContext.lambda$wrap$3(PropagatedContext.java:211) at io.micronaut.core.propagation.PropagatedContext.lambda$wrap$3(PropagatedContext.java:211) at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314) at java.base/java.lang.VirtualThread.run(VirtualThread.java:309) Caused by: java.net.ConnectException: Address already in use: getsockopt at java.net.http/jdk.internal.net.http.common.Utils.toConnectException(Utils.java:1028) at java.net.http/jdk.internal.net.http.PlainHttpConnection$ConnectEvent.handle(PlainHttpConnection.java:155) at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.handleEvent(HttpClientImpl.java:1467) at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.lambda$run$3(HttpClientImpl.java:1412) at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.run(HttpClientImpl.java:1412) Caused by: java.net.BindException: Address already in use: getsockopt at java.base/sun.nio.ch.Net.pollConnect(Native Method) at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682) at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:973) at java.net.http/jdk.internal.net.http.PlainHttpConnection$ConnectEvent.handle(PlainHttpConnection.java:139) ... 4 common frames omitted

client code:

@Client("my")
public interface MyClient {
    @Get(value = "/text2", produces = MediaType.TEXT_PLAIN)
    public String text2();
}

controller code:

@Controller
@ExecuteOn(TaskExecutors.BLOCKING)
public class MyController {

    private final MyClient myClient;

    public MyController(MyClient myClient) {
        this.myClient = myClient;
    }

    @Get("/text")
    public String text() {
        return myClient.text2();
    }
    @Get("/text2")
    public String text2() {
        return """
                Algorithms and implementations
                A 1964-developed algorithm[1] is popularly known as the Knuth shuffle or the Fisher–Yates shuffle (based on work they did in 1938). A real-world use for this is sampling water quality in a reservoir.

                In 1999, a new feature was added to the Pentium III: a hardware-based random number generator.[2][3] It has been described as "several oscillators combine their outputs and that odd waveform is sampled asynchronously."[4] These numbers, however, were only 32 bit, at a time when export controls were on 56 bits and higher, so they were not state of the art.[5]

                Common understanding
                In common understanding, "1 2 3 4 5" is not as random as "3 5 2 1 4" and certainly not as random as "47 88 1 32 41" but "we can't say authoritavely that the first sequence is not random ... it could have been generated by chance."[6]

                When a police officer claims to have done a "random .. door-to-door" search, there is a certain expectation that members of a jury will have.[7][8][example needed]

                Real world consequences
                Flaws in randomness have real-world consequences.[9][10]

                A 99.8% randomness was shown by researchers to negatively affect an estimated 27,000 customers of a large service[9] and that the problem was not limited to just that situation.[clarification needed]
                """;
    }
}

Steps To Reproduce

start micronaut server

ab -n 10000 -c 50 http://localhost:8080/text

print result

Concurrency Level: 50 Time taken for tests: 47.668 seconds Complete requests: 10000 Failed requests: 6001 (Connect: 0, Receive: 0, Length: 6001, Exceptions: 0) Non-2xx responses: 1587 Total transferred: 12660779 bytes HTML transferred: 11417605 bytes Requests per second: 209.78 [#/sec] (mean) Time per request: 238.342 [ms] (mean) Time per request: 4.767 [ms] (mean, across all concurrent requests) Transfer rate: 259.38 [Kbytes/sec] received

Connection Times (ms) min mean[+/-sd] median max Connect: 0 2 31.0 0 2826 Processing: 69 233 288.3 149 3039 Waiting: 0 141 235.6 100 3019 Total: 69 235 290.9 150 3039

Percentage of the requests served within a certain time (ms) 50% 150 66% 170 75% 201 80% 332 90% 400 95% 525 98% 945 99% 1632 100% 3039 (longest request)

after the proposed change:

Concurrency Level: 50 Time taken for tests: 2.139 seconds Complete requests: 10000 Failed requests: 0 Non-2xx responses: 10000 Total transferred: 2590000 bytes HTML transferred: 1330000 bytes Requests per second: 4674.21 [#/sec] (mean) Time per request: 10.697 [ms] (mean) Time per request: 0.214 [ms] (mean, across all concurrent requests) Transfer rate: 1182.25 [Kbytes/sec] received

Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 1.0 0 16 Processing: 0 8 8.6 8 496 Waiting: 0 6 8.3 3 487 Total: 0 8 8.6 8 496

Percentage of the requests served within a certain time (ms) 50% 8 66% 14 75% 16 80% 16 90% 16 95% 16 98% 20 99% 21 100% 496 (longest request)

Environment Information

-Windows -Correto 21 SDK

Example Application

No response

Version

4.4.2

pm3 commented 4 months ago

sorry I didn't put a link to the example application

https://github.com/pm3/demo1