reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.59k stars 646 forks source link

NotYetConnectedException #1013

Closed msoub closed 4 years ago

msoub commented 4 years ago

Might be related to #982.

We experience a java.nio.channels.NotYetConnectedException randomly while performing requests in parallel. We have tried several connection provider strategies (fixed, elastic, new connection) through:

@Bean
fun resourceFactory(): ReactorResourceFactory {
    val factory = ReactorResourceFactory()
    factory.isUseGlobalResources = false
    factory.connectionProvider = ConnectionProvider.newConnection()
    factory.afterPropertiesSet()
    return factory
}

however none gets rid of the problem. Though it looks like when we use ConnectionProvider.newConnection() the exception is raised less often.

Here is the stack trace:

java.nio.channels.NotYetConnectedException: null
    at io.netty.channel.unix.Errors.ioResult(Errors.java:171)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoLift] :
    reactor.core.publisher.Mono.create(Mono.java:185)
    reactor.netty.resources.NewConnectionProvider.acquire(NewConnectionProvider.java:53)
Error has been observed at the following site(s):
    |_        Mono.create ⇢ at reactor.netty.resources.NewConnectionProvider.acquire(NewConnectionProvider.java:53)
    |_        Mono.create ⇢ at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:284)
    |_         Mono.retry ⇢ at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:321)
    |_   Mono.flatMapMany ⇢ at reactor.netty.http.client.HttpClientFinalizer.responseConnection(HttpClientFinalizer.java:85)
    |_          Flux.next ⇢ at org.springframework.http.client.reactive.ReactorClientHttpConnector.connect(ReactorClientHttpConnector.java:112)
    |_   Mono.doOnRequest ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:105)
    |_    Mono.doOnCancel ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:106)
    |_           Mono.map ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:107)
    |_      Mono.doOnEach ⇢ at org.springframework.boot.actuate.metrics.web.reactive.client.MetricsWebClientFilterFunction.filter(MetricsWebClientFilterFunction.java:88)
    |_         checkpoint ⇢ Request to POST http://recco.service.consul/positions [DefaultWebClient]
    |_ Mono.switchIfEmpty ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchange$0(DefaultWebClient.java:340)
    |_         Mono.defer ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:338)
    |_       Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.bodyToMono(DefaultWebClient.java:474)

Your Environment

violetagg commented 4 years ago

@msoub Can you provide some reproducible example?

msoub commented 4 years ago

Here you go. ./gradlew check

It looks like all the operators are necessary to reproduce the two bugs...

violetagg commented 4 years ago

@msoub Cannot reproduce neither on Mac OS nor on Ubuntu. Is it possible to provide the whole stack trace not only part of it?

on Mac OS

expectation "expectError(Class)" failed (expected: onError(NotYetConnectedException); actual: onNext(<200 OK OK,[Content-Type:"application/json;charset=UTF-8", Content-Length:"2"]>))
java.lang.AssertionError: expectation "expectError(Class)" failed (expected: onError(NotYetConnectedException); actual: onNext(<200 OK OK,[Content-Type:"application/json;charset=UTF-8", Content-Length:"2"]>))
    at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
    at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
    at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
    at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)

on Ubuntu

java.lang.AssertionError: expectation &quot;expectError(Class)&quot; failed (expected error of type: NotYetConnectedException; actual type: reactor.netty.http.client.P
rematureCloseException: Connection prematurely closed BEFORE response)
        at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
        at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
        at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
msoub commented 4 years ago

The complete stack trace:

java.nio.channels.NotYetConnectedException: null
    at io.netty.channel.unix.Errors.ioResult(Errors.java:171) ~[netty-transport-native-unix-common-4.1.45.Final.jar:4.1.45.Final]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoCreate] :
    reactor.core.publisher.Mono.create(Mono.java:185)
    reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:137)
Error has been observed at the following site(s):
    |_        Mono.create ⇢ at reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:137)
    |_        Mono.create ⇢ at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:284)
    |_         Mono.retry ⇢ at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:321)
    |_   Mono.flatMapMany ⇢ at reactor.netty.http.client.HttpClientFinalizer.responseConnection(HttpClientFinalizer.java:85)
    |_          Flux.next ⇢ at org.springframework.http.client.reactive.ReactorClientHttpConnector.connect(ReactorClientHttpConnector.java:112)
    |_   Mono.doOnRequest ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:105)
    |_    Mono.doOnCancel ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:106)
    |_           Mono.map ⇢ at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:107)
    |_         checkpoint ⇢ Request to POST http://localhost:45175/1/ [DefaultWebClient]
    |_ Mono.switchIfEmpty ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.lambda$exchange$0(DefaultWebClient.java:340)
    |_         Mono.defer ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:338)
    |_       Mono.flatMap ⇢ at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.toBodilessEntity(DefaultWebClient.java:575)
    |_       Mono.flatMap ⇢ at com.edgelab.reactornetty.Client.optional(ReactorNettyApplication.kt:92)
    |_           Mono.zip ⇢ at com.edgelab.reactornetty.Client.retrieve(ReactorNettyApplication.kt:68)
    |_       Flux.flatMap ⇢ at com.edgelab.reactornetty.Client.notYetConnectedException(ReactorNettyApplication.kt:62)
    |_           Flux.map ⇢ at com.edgelab.reactornetty.Client.notYetConnectedException(ReactorNettyApplication.kt:63)
    |_   Flux.collectList ⇢ at com.edgelab.reactornetty.Client.bulk(ReactorNettyApplication.kt:99)
    |_       Mono.flatMap ⇢ at com.edgelab.reactornetty.Client.bulk(ReactorNettyApplication.kt:100)
    |_       Flux.flatMap ⇢ at com.edgelab.reactornetty.Client.notYetConnectedException(ReactorNettyApplication.kt:64)
Stack trace:
        at io.netty.channel.unix.Errors.ioResult(Errors.java:171) ~[netty-transport-native-unix-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.unix.FileDescriptor.readAddress(FileDescriptor.java:143) ~[netty-transport-native-unix-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.channel.epoll.AbstractEpollChannel.doReadBytes(AbstractEpollChannel.java:348) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:778) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) [netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]

FYI here is the output of ./gradlew -v

------------------------------------------------------------
Gradle 6.0.1
------------------------------------------------------------

Build time:   2019-11-18 20:25:01 UTC
Revision:     fad121066a68c4701acd362daf4287a7c309a0f5

Kotlin:       1.3.50
Groovy:       2.5.8
Ant:          Apache Ant(TM) version 1.10.7 compiled on September 1 2019
JVM:          1.8.0_232 (Oracle Corporation 25.232-b09)
OS:           Linux 4.15.0-43-generic amd64
msoub commented 4 years ago

The logs are showing ByteBuf leaks too:

io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
    io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
    io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:777)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:748)
msoub commented 4 years ago

@violetagg I have updated the tests. Could you please try again?

msoub commented 4 years ago

After the tests have run, many sockets remain open which causes a big resource leak: lsof -i | grep java | grep CLOSE_WAIT

msoub commented 4 years ago

@violetagg were you able to reproduce the problem?

violetagg commented 4 years ago

@msoub I believe the issue is in the sample itself.

You are using Mono.zip here

https://github.com/msoub/reactor-netty-1013/blob/master/src/main/kotlin/com/edgelab/reactornetty/ReactorNettyApplication.kt#L74

and here you return Mono.empty() from time to time

https://github.com/msoub/reactor-netty-1013/blob/master/src/main/kotlin/com/edgelab/reactornetty/ReactorNettyApplication.kt#L99

According to the javadoc

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#zip-reactor.core.publisher.Mono-reactor.core.publisher.Mono-

An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

But on the client side if you cancel while reading the incoming data, Reactor Netty does not have any other choice but to close the connection. The connection cannot be returned to the pool because there might be more data that will come.

So in your case you are closing constantly the connections from the pool. This means that you are exhausting the system resources and because of that you see this exception.

msoub commented 4 years ago

Thanks for the detailed answer! Indeed, it makes sense from the Netty's perspective. However, I'm a bit concerned that a Mono not returning a valid value can have such consequences on the application.

violetagg commented 4 years ago

@msoub why don't you try https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#zipDelayError-reactor.core.publisher.Mono-reactor.core.publisher.Mono-

Other than that you have to be careful with operators that cancel the incoming data, because if you do not consume the full incoming data, the connection will be closed and not returned to the pool.