Closed spring-projects-issues closed 7 years ago
Rossen Stoyanchev commented
I think the key here is the Reactor and Reactor Netty version. Can you try Bismuth snapshots? There is a recent fix post Bismuth 4 (from today) that's worth checking.
Arnaud Cogoluègnes commented
Not more luck with the latest Bismuth snapshots.
Rossen Stoyanchev commented
Right, so the Reactor Netty issue #138 was about emitting an error signal when the server closes the connection prematurely. Now you're getting the IOException (as expected). The question is why the server does that.
Can you see anything in the RabbitMQ logs that would indicate whether it is proactively closing/rejecting, or does it hit some exception and drop it? The server version is also of interest and narrowing down to a specific milestone where it stopped could help to figure out some change that causes this. Do you have full request details available? We could consider opening a ticket with RabbitMQ as well to ask why this happens.
Arnaud Cogoluègnes commented
There's nothing in the RabbitMQ logs. I'll try to lower the log level. I'll try to also to narrow down the issue (maybe using only Reactor Netty) and to find when the problem shows up in RabbitMQ (between the 3.7.0 milestones and 3.6.x). There may have been upgrades of the HTTP part (Cowboy and Ranch).
Note the HOP blocking client works (it uses RestTemplate
and Commons HttpClient).
Rossen Stoyanchev commented
Differences in the headers between the blocking and the WebClient might provide some clues too.
Rossen Stoyanchev commented
it looks like Wireshark doesn't recognize WebClient PUT requests as HTTP requests
Also does the request look complete -- headers and entire body content, matching to the content-length (if present)?
Arnaud Cogoluègnes commented
Here are 2 runs of the same code, one failing, the second one successful (against RabbitMQ 3.7.0 M17):
[DEBUG] (main) Using Console logging
[DEBUG] (main) onSubscribe(FluxMap.MapSubscriber)
[DEBUG] (main) request(unbounded)
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New http client pool for /127.0.0.1:15672
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@25fb8912(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-http-nio-4) Created [id: 0xdd37c7c0], now 1 active connections
[DEBUG] (reactor-http-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpDecoder = io.netty.handler.codec.http.HttpResponseDecoder), (reactor.left.httpEncoder = io.netty.handler.codec.http.HttpRequestEncoder), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-http-nio-4) Acquired active channel: [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) [HttpClient] [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] handler is being applied: HttpClientHandler{startURI=http://127.0.0.1:15672/api/exchanges/%2F/hop.test, method=PUT, handler=org.springframework.http.client.reactive.ReactorClientHttpConnector$$Lambda$70/169663597@6d05c13c}
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Writing object DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
PUT /api/exchanges/%2F/hop.test HTTP/1.1
user-agent: ReactorNetty/0.7.0.M2
transfer-encoding: chunked
host: 127.0.0.1:15672
accept: */*
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Content-Type: application/json
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Writing object FluxMapFuseable
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Pending write size = 85
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] End of the pipeline, User event [Response Write Completed]
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Writing object EmptyLastHttpContent
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Received response (auto-read:false) : [server=Cowboy, date=Thu, 21 Sep 2017 13:21:37 GMT, content-length=0, content-type=application/json, vary=accept, accept-encoding, origin]
[DEBUG] (reactor-http-nio-4) onNext(ReactorClientHttpResponse{request=[PUT /api/exchanges/%2F/hop.test],status=204})
[DEBUG] (reactor-http-nio-4) onComplete()
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Received last HTTP packet
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Disposing ChannelOperation from a channel
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@3100fc45
[DEBUG] (reactor-http-nio-4) Releasing channel: [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) Released [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672], now 0 active connections
[DEBUG] (main) onSubscribe(FluxMap.MapSubscriber)
[DEBUG] (main) request(unbounded)
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@6392827e(incomplete) SimpleChannelPool{activeConnections=0}
[DEBUG] (reactor-http-nio-4) Acquired [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672], now 1 active connections
[DEBUG] (reactor-http-nio-4) Acquired active channel: [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) [HttpClient] [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] handler is being applied: HttpClientHandler{startURI=http://127.0.0.1:15672/api/exchanges/%2F, method=GET, handler=org.springframework.http.client.reactive.ReactorClientHttpConnector$$Lambda$70/169663597@1d657532}
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 - R:localhost/127.0.0.1:15672] Writing object DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET /api/exchanges/%2F HTTP/1.1
user-agent: ReactorNetty/0.7.0.M2
host: 127.0.0.1:15672
accept: */*
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Content-Type: application/json
content-length: 0
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672] Disposing ChannelOperation from a channel
[ERROR] (reactor-http-nio-4) onError(java.io.IOException: Connection closed prematurely)
[ERROR] (reactor-http-nio-4) - java.io.IOException: Connection closed prematurely
java.io.IOException: Connection closed prematurely
at reactor.ipc.netty.http.client.HttpClientOperations.onInboundComplete(HttpClientOperations.java:262)
at reactor.ipc.netty.channel.ChannelOperations.onHandlerTerminate(ChannelOperations.java:417)
at reactor.ipc.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:167)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1354)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:917)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:763)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.lang.Thread.run(Thread.java:748)
[TRACE] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-http-nio-4) [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@37d97617
[DEBUG] (reactor-http-nio-4) Releasing channel: [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) Released [id: 0xdd37c7c0, L:/127.0.0.1:44684 ! R:localhost/127.0.0.1:15672], now 0 active connections
Exception in thread "main" reactor.core.Exceptions$ReactiveException: java.io.IOException: Connection closed prematurely
at reactor.core.Exceptions.propagate(Exceptions.java:294)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
at reactor.core.publisher.Flux.blockFirst(Flux.java:1909)
at io.pivotal.App.main(App.java:19)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:88)
... 2 more
Caused by: java.io.IOException: Connection closed prematurely
at reactor.ipc.netty.http.client.HttpClientOperations.onInboundComplete(HttpClientOperations.java:262)
at reactor.ipc.netty.channel.ChannelOperations.onHandlerTerminate(ChannelOperations.java:417)
at reactor.ipc.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:377)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:342)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:167)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:224)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1354)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:245)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:231)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:917)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:763)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 1
/usr/local/java8/bin/java -javaagent:/usr/local/idea-IC-172.3317.76/lib/idea_rt.jar=43563:/usr/local/idea-IC-172.3317.76/bin -Dfile.encoding=UTF-8 -classpath /usr/local/java8/jre/lib/charsets.jar:/usr/local/java8/jre/lib/deploy.jar:/usr/local/java8/jre/lib/ext/cldrdata.jar:/usr/local/java8/jre/lib/ext/dnsns.jar:/usr/local/java8/jre/lib/ext/jaccess.jar:/usr/local/java8/jre/lib/ext/jfxrt.jar:/usr/local/java8/jre/lib/ext/localedata.jar:/usr/local/java8/jre/lib/ext/nashorn.jar:/usr/local/java8/jre/lib/ext/sunec.jar:/usr/local/java8/jre/lib/ext/sunjce_provider.jar:/usr/local/java8/jre/lib/ext/sunpkcs11.jar:/usr/local/java8/jre/lib/ext/zipfs.jar:/usr/local/java8/jre/lib/javaws.jar:/usr/local/java8/jre/lib/jce.jar:/usr/local/java8/jre/lib/jfr.jar:/usr/local/java8/jre/lib/jfxswt.jar:/usr/local/java8/jre/lib/jsse.jar:/usr/local/java8/jre/lib/management-agent.jar:/usr/local/java8/jre/lib/plugin.jar:/usr/local/java8/jre/lib/resources.jar:/usr/local/java8/jre/lib/rt.jar:/home/acogoluegnes/temp/rabbitmq/hopwebflux/target/classes:/home/acogoluegnes/.m2/repository/org/springframework/spring-webflux/5.0.0.RC4/spring-webflux-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/io/projectreactor/reactor-core/3.1.0.RC1/reactor-core-3.1.0.RC1.jar:/home/acogoluegnes/.m2/repository/org/reactivestreams/reactive-streams/1.0.1/reactive-streams-1.0.1.jar:/home/acogoluegnes/.m2/repository/org/springframework/spring-beans/5.0.0.RC4/spring-beans-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/org/springframework/spring-core/5.0.0.RC4/spring-core-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/org/springframework/spring-jcl/5.0.0.RC4/spring-jcl-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/org/springframework/spring-web/5.0.0.RC4/spring-web-5.0.0.RC4.jar:/home/acogoluegnes/.m2/repository/io/projectreactor/ipc/reactor-netty/0.7.0.M2/reactor-netty-0.7.0.M2.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-codec-http/4.1.15.Final/netty-codec-http-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-codec/4.1.15.Final/netty-codec-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-handler/4.1.15.Final/netty-handler-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-buffer/4.1.15.Final/netty-buffer-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-transport/4.1.15.Final/netty-transport-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-resolver/4.1.15.Final/netty-resolver-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-handler-proxy/4.1.15.Final/netty-handler-proxy-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-codec-socks/4.1.15.Final/netty-codec-socks-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-transport-native-epoll/4.1.15.Final/netty-transport-native-epoll-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-common/4.1.15.Final/netty-common-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/io/netty/netty-transport-native-unix-common/4.1.15.Final/netty-transport-native-unix-common-4.1.15.Final.jar:/home/acogoluegnes/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.9.0.pr4/jackson-databind-2.9.0.pr4.jar:/home/acogoluegnes/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.9.0.pr4/jackson-annotations-2.9.0.pr4.jar:/home/acogoluegnes/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.9.0.pr4/jackson-core-2.9.0.pr4.jar io.pivotal.App
[DEBUG] (main) Using Console logging
[DEBUG] (main) onSubscribe(FluxMap.MapSubscriber)
[DEBUG] (main) request(unbounded)
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New http client pool for /127.0.0.1:15672
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@25fb8912(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-http-nio-4) Created [id: 0xf526ab83], now 1 active connections
[DEBUG] (reactor-http-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpDecoder = io.netty.handler.codec.http.HttpResponseDecoder), (reactor.left.httpEncoder = io.netty.handler.codec.http.HttpRequestEncoder), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-http-nio-4) Acquired active channel: [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) [HttpClient] [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] handler is being applied: HttpClientHandler{startURI=http://127.0.0.1:15672/api/exchanges/%2F/hop.test, method=PUT, handler=org.springframework.http.client.reactive.ReactorClientHttpConnector$$Lambda$70/169663597@6ff0dd74}
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Writing object DefaultHttpRequest(decodeResult: success, version: HTTP/1.1)
PUT /api/exchanges/%2F/hop.test HTTP/1.1
user-agent: ReactorNetty/0.7.0.M2
transfer-encoding: chunked
host: 127.0.0.1:15672
accept: */*
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Content-Type: application/json
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Writing object FluxMapFuseable
[TRACE] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Pending write size = 85
[TRACE] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] End of the pipeline, User event [Response Write Completed]
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Writing object EmptyLastHttpContent
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Received response (auto-read:false) : [server=Cowboy, date=Thu, 21 Sep 2017 13:22:36 GMT, content-length=0, content-type=application/json, vary=accept, accept-encoding, origin]
[DEBUG] (reactor-http-nio-4) onNext(ReactorClientHttpResponse{request=[PUT /api/exchanges/%2F/hop.test],status=204})
[DEBUG] (reactor-http-nio-4) onComplete()
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Received last HTTP packet
[TRACE] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Disposing ChannelOperation from a channel
[DEBUG] (main) onSubscribe(FluxMap.MapSubscriber)
[DEBUG] (main) request(unbounded)
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@e15b7e8(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-http-nio-2) Created [id: 0x7edd660a], now 2 active connections
[DEBUG] (reactor-http-nio-2) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.left.httpDecoder = io.netty.handler.codec.http.HttpResponseDecoder), (reactor.left.httpEncoder = io.netty.handler.codec.http.HttpRequestEncoder), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-http-nio-2) Acquired active channel: [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-2) [HttpClient] [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672] handler is being applied: HttpClientHandler{startURI=http://127.0.0.1:15672/api/exchanges/%2F, method=GET, handler=org.springframework.http.client.reactive.ReactorClientHttpConnector$$Lambda$70/169663597@127c7454}
[DEBUG] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672] Writing object DefaultFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 0))
GET /api/exchanges/%2F HTTP/1.1
user-agent: ReactorNetty/0.7.0.M2
host: 127.0.0.1:15672
accept: */*
Authorization: Basic Z3Vlc3Q6Z3Vlc3Q=
Content-Type: application/json
content-length: 0
[TRACE] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-http-nio-4) [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@6eff0ee7
[DEBUG] (reactor-http-nio-4) Releasing channel: [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-4) Released [id: 0xf526ab83, L:/127.0.0.1:44740 - R:localhost/127.0.0.1:15672], now 1 active connections
[DEBUG] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672] Received response (auto-read:false) : [server=Cowboy, date=Thu, 21 Sep 2017 13:22:37 GMT, content-length=1433, content-type=application/json, vary=accept, accept-encoding, origin, Cache-Control=no-cache]
[DEBUG] (reactor-http-nio-2) onNext(ReactorClientHttpResponse{request=[GET /api/exchanges/%2F],status=200})
[DEBUG] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 - R:localhost/127.0.0.1:15672] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[DEBUG] (reactor-http-nio-2) onComplete()
[DEBUG] (reactor-http-nio-2) cancel()
[TRACE] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672] Disposing ChannelOperation from a channel
[TRACE] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-http-nio-2) [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@4c0c5584
[DEBUG] (reactor-http-nio-2) Releasing channel: [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672]
[DEBUG] (reactor-http-nio-2) Released [id: 0x7edd660a, L:/127.0.0.1:44742 ! R:localhost/127.0.0.1:15672], now 0 active connections
Process finished with exit code 0
Still investigating.
Rossen Stoyanchev commented
You can set reactor.ipc.netty
to TRACE level. That will show content being written as well.
Rossen Stoyanchev commented
I am now up and running with the test and able to reproduce the failure.
Arnaud Cogoluègnes commented
There have a been an upgrade of RabbitMQ HTTP server (cowboy) between RabbitMQ 3.6.12 and RabbitMQ 3.7.0.M2. I'll investigate more tomorrow.
Rossen Stoyanchev commented
Okay so I see the PUT with the 201 response and Content-Length:0. As a result Reactor Netty does get LastHttpContent
and releases the connection back to the pool. So it gets re-used for the subsequent GET. However in Wireshark I don't see the connection getting closed, and that's what confuses Wireshare -- it mixes the PUT and the GET into one.
Looking at the code you're using the WebClient and for the PUT you have exchange().block()
, i.e. never consuming the body, nor closing (recently added, see #19316). If I add a close()
it works but without an explicit close it doesn't.
client.declareExchange(v, s, new ExchangeInfo("fanout", false, false))
.doOnNext({response -> response.close()})
.block()
Given Content-Length:0 I don't think we should have to close explicitly so I suspect this may end up being a Reactor Netty fix but I'm not sure yet.
As for the Cowboy server, I see in the release notes:
Migration to Cowboy REST
RabbitMQ management plugin as well as its extensions (e.g. those of
Federation and Shovel, rabbitmq-top) now uses Cowboy REST
instead of Webmachine. Cowboy is a state-of-the-art open source Erlang HTTP 1.1 server and REST micro framework
that is also used in the plugins that provide WebSocket support.
The change is largely invisible to management UI and HTTP API
clients but there is one change that can affect test suites: POST and PUT responses now use 201 Created instead of 204 No Content.
The 201-204 on PUT looks relevant but at this time I don't think that's the issue because as far as I can see Netty does the same thing after 204 and 201 with content-length:0.
Arnaud Cogoluègnes commented
Sequences that fail always re-use the same connection from the pool. Sequences that succeed always use a new connection Created [id: 0x7edd660a], now 2 active connections
. I don't know if that matters.
I'm using Reactor Netty HttpClient
but don't manage to reproduce.
Rossen Stoyanchev commented
Yes, the same connection is re-used after not being closed, did you see my last comment above?
What is the code you're using with Reactor Netty HttpClient?
Arnaud Cogoluègnes commented
Something like this (far from perfect I guess, I've just discovered the API):
HttpClient client = HttpClient.create();
client.put("http://localhost:15672/api/exchanges/%2F/hoptest", request -> {
request
.addHeader("Authorization", "Basic Z3Vlc3Q6Z3Vlc3Q=")
.addHeader("Content-Type", "application/json")
.sendString(Mono.just("{\"type\":\"fanout\",\"durable\":false,\"internal\":false,\"arguments\":{},\"auto_delete\":false}")).then().block();
return request;
}).block().responseHeaders();
client.get("http://localhost:15672/api/exchanges/%2F", request -> {
request
.addHeader("Authorization", "Basic Z3Vlc3Q6Z3Vlc3Q=")
.addHeader("Content-Type", "application/json")
.addHeader("content-length", "0");
return request;
}).block().status();
Rossen Stoyanchev commented
And you confirmed there is Content-Length:0 in the response + the same connection is re-used?
Arnaud Cogoluègnes commented
In the 201 response there is content-length: 0
, but another connection is used for the GET request.
Rossen Stoyanchev commented
Right i'm certain you have the same issue, i.e. you'd have to call HttpClientResponse.dispose()
.
I'm working on a fix on the WebClient side to automatically close for empty responses. I will also create an issue in Reactor Netty since it shouldn't return connections to the pool that have not been disposed.
Effectively we try to save you that effort where you've either consumed the body (via bodyToMono for example) or when the body is empty. Overall though you're still responsible to consume the body or call close() after exchange(). Alternatively you can also use retrieve() which simplifies the way to consume the body. And if you don't do any of that, the fix on the Reactor Netty should ensure the connection pool doesn't have unclosed connections.
Arnaud Cogoluègnes commented
Makes sense. I admit the code isn't perfect or representative of the usage of a reactive API (I ported most of the code from the blocking client test as-is). Nevertheless, this kind of usage could happen in the wild, so I guess it's worth the effort.
Thanks for the follow-up and happy to test the fixes.
Rossen Stoyanchev commented
After further investigation, it seems that the problem is on the side of Cowboy, which is also consistent with the fact that it worked in 3.6.x where a different HTTP server was used.
The WebClient uses a connection pool, with persistent connection (HTTP 1.1 default), so it does not need to be closed. For some reason that causes Cowboy to hang on the GET after the PUT (with chunked encoding). We've tested the same interaction -- with the connection re-used, and confirmed it works fine with Tomcat and Reactor Netty server.
If you can't figure out why Cowboy behaves this way and fix it, then you might need to use the workaround I specified above with doOnNext()
and response.close()
. However keep in mind the downside is that closing the connection regularly defeats the benefit of using a connection pool.
Rossen Stoyanchev commented
Resolving for now since we have confirmation from testing with other HTTP servers but feel free to comment.
Arnaud Cogoluègnes commented
That is indeed a Cowboy problem. Cowboy 2 should fix it but we already have a fix in RabbitMQ. Thanks for your help on this Rossen.
Rossen Stoyanchev commented
Good to know, thanks!
Arnaud Cogoluègnes opened SPR-15972 and commented
I'm getting the following exception in the HTTP RabbitMQ Client test suite after an upgrade to Spring 5.0.0.RC4:
Instructions to reproduce:
The test suite was affected by a Reactor Netty issue and was working with the connection pool disabled. It also works with Spring 5.0.0.RC4, but only against RabbitMQ 3.6.x. Disabling the connection pool makes the test suite works against Spring 5.0.0.RC4 and RabbitMQ 3.7.0.M20.
I noticed something weird: it looks like Wireshark doesn't recognize
WebClient
PUT requests as HTTP requests (the HTTP details don't show up, see capture).Affects: 5.0 RC4
Attachments: