reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.61k stars 646 forks source link

Connection prematurely closed BEFORE response on half closed connection #1764

Closed mkluzacek closed 3 years ago

mkluzacek commented 3 years ago

When using webclient and calling rest api on another server(using tomcat) the webclient sometimes doesnt acknowledge the connection finish from the server and later try to reuse the already closed connection. The connection is closed by the tomcat server after 60s (default keep alive settings). Most of the times the connection gets closed correctly on the client side but sometimes it just sends [ACK] and no [ACK,FIN] and keeps the connection opened.

Expected Behavior

The client should close the connection after receiving [FIN] from the server.

Actual Behavior

The connection is not closed by the client and later reused resulting in reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response

Full stacktrace:

org.springframework.web.reactive.function.client.WebClientRequestException: Connection prematurely closed BEFORE response; nested exception is reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
    at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Request to GET http://testload700-metadata-service:8080/api/aggregations/aggregation-ids?entityId=RXBnOjQxNTM3OTIy [DefaultWebClient]
Stack trace:
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:70)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4150)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)
        at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
        at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:224)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:273)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:413)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:250)
        at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
        at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:189)
        at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
        at reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:358)
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:647)
        at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onUncaughtException(DefaultPooledConnectionProvider.java:214)
        at reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onUncaughtException(DefaultPooledConnectionProvider.java:462)
        at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:290)
        at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:74)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:235)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
        at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:311)
        at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response

Steps to Reproduce

Use webclient to call rest api on a tomcat server. We are sending around 700 requests/s, 300 000 request in total and we get 1 error mentioned before.

There are around 7000 correctly closed connections and single incorrect one so its really uncommon but we can reproduce it in 100% of cases.

This is the webclient with no special configuration:

public class WebClient {
private final ParameterizedTypeReference<Set<Foo>> reference =
      new ParameterizedTypeReference<>() {
      };

public WebClient(
      WebClient.Builder webClientBuilder,
      String serviceUrl
  ) {
    webClient = webClientBuilder.baseUrl(serviceUrl)
        .build();
  }

  private Mono<Set<Foo>> foo(String entityId) {
    UriComponentsBuilder builder = UriComponentsBuilder.fromPath(PATH)
        .query("entityId={entityId}");

    return webClient.get()
        .uri(builder.build()
            .toUriString(), entityId)
        .retrieve()
        .bodyToMono(reference);
  }
}

Possible Solution

Your Environment

Spring Boot Starter Webflux: 2.4.5 Spring Boot Starter Reactor Netty: 2.4.5 Reactor Core: 3.4.5

Java version: openjdk version "11.0.6" 2020-01-14 OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10) OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.6+10, mixed mode)

Running in docker: Linux 27e260f5943f 4.19.0-0.bpo.8-amd64 #1 SMP Debian 4.19.98-1~bpo9+1 (2020-03-09) x86_64 GNU/Linux

Here are the relevant logs: logs_closed_before.csv

And here is the tcp dump: tcp_dump.csv Client is: 172.18.0.18 Server is: 172.18.0.13

violetagg commented 3 years ago

@mkluzacek You have to configure proper max idle timeout for the connections in the connection pool. Please take a look at this documentation https://projectreactor.io/docs/netty/release/reference/index.html#connection-pool-timeout

mkluzacek commented 3 years ago

Thanks for the response. I forgot to mention that i already tried setting the reactor.netty.pool.maxLifeTime via JAVA_OPTS like -Dreactor.netty.pool.maxLifeTime=45000 but i still got the error. I think it should be equivalent to the settings mentioned in the docs?

violetagg commented 3 years ago

@mkluzacek You need to set maxIdleTime this is related to idle connections, the same that you have on the Tomcat (keepAliveTimeout - The number of milliseconds this Connector will wait for another HTTP request before closing the connection. The default value is to use the value that has been set for the connectionTimeout attribute. Use a value of -1 to indicate no (i.e. infinite) timeout., which represents the time that the connection is idle)

mkluzacek commented 3 years ago

OK i see now. I ll try setting the maxIdleTime. But should not the maxLifeTime work as well? It is bit excesive as it could close connection that is completely fine, but should solve this problem nonetheless, or am I missing something?

mkluzacek commented 3 years ago

Still got the error even with maxIdleTime set to 45 secs. Double checked that the configuration is set correctly using the JAVA_OPTS and it sets the value correctly.

violetagg commented 3 years ago

@mkluzacek Can you prepare an example project that we can run in order to reproduce the issue?

mkluzacek commented 3 years ago

Unfortunately I am not able to reproduce it with a simple example project. Must be some combination of dependencies and other factors that are causing it. Still thanks for the help.

ryanrupp commented 2 years ago

For anyone that stumbles upon this, I ran into this issue specifically with Azure load balancers because the default load balancer settings do not send a close notification when idle timeout is hit. See more information here - ultimately I resolved this by configuring the maxIdleTime mentioned here. I wanted to use TCP keep-alive but it is not supported by Java on Windows.

KarteekGodavarthi commented 2 years ago

Hi @mkluzacek / @violetagg , I have come across the same issue which is happening intermittently.

We have a web flux app which makes a HTTP call to backend server hosted On-Prem using WebClient. It works most of the time, but in few scenarios we find this error "Connection Prematurely Closed BEFORE response" intermittently.

However we are unable to narrow down the specific use case when this is happening. So we tried to put a timer on the API exec. time, during which we noticed that the time taken for all the premature failures is around 40-60 milli seconds. I assume this 40-60ms is the network time from my app to the Load balancer.

@mkluzacek Were you able to resolve the issue mentioned above? If YES, could you share what it is?

violetagg commented 2 years ago

@KarteekGodavarthi Did you check this FAQ page https://projectreactor.io/docs/netty/release/reference/index.html#faq.connection-closed

kkajdd commented 2 years ago

Hej @violetagg , We ran into the same issue where it says PoolAcquireTimeoutException: Pool#acquire(Duration) has been pending for more than the configured timeout of 45000ms. We are using webclient post call without any timedout config. On looking into above solution, it looks like we need to set maxIdleTime where in we are trying to set in

  HttpClient client = HttpClient.create(ConnectionProvider.builder("custom")
                        .maxIdleTime(Duration.ofSeconds(120))
                        .build());

        WebClient webClient = WebClient.builder()
                .clientConnector(new ReactorClientHttpConnector(client))
                .build();

We tried the debug possibilities mentioned in https://projectreactor.io/docs/netty/release/reference/index.html#faq.connection-closed but no luck
But on looking reactor document, they have also mentioned `pendingAcquireTimeout ` to be set and im a bit confused if i need to set this as well. Since we are not able to reproduce it locally, we need to test/try it in prod and can someone suggest if this is the right approach?
violetagg commented 2 years ago

@kkajdd I think there is some misunderstanding here. As it is described in the documentation

maxIdleTime - The maximum time (resolution: ms) that this connection stays idle in the connection pool. so this applies to connections that are idle in the connection pool.

If you want to specify timeout for the request/response then you need

HttpClient.responseTimeout(...);

Please ask your questions on Gitter or stackoverflow as it is recommended in README

unoexperto commented 1 year ago

I have the same problem with the latest reactor-native. Setting maxIdleTime to large value (2 mins) doesn't help. I get this exception sooner. Has anyone figured out what it is ?

UPD: It looks like it's happening because SOCKS5 closes the connection in the middle of the http request. https://github.com/net-byte/socks5-server/issues/8

DariusX commented 8 months ago

@violetagg on the maxIdleTime, would setting it to 0 (zero) essentially mean that it is closed immediately. In effect, one would not be using the pool if one sets it to zero?

violetagg commented 8 months ago

@violetagg on the maxIdleTime, would setting it to 0 (zero) essentially mean that it is closed immediately. In effect, one would not be using the pool if one sets it to zero?

yes