reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.6k stars 646 forks source link

Sending several message one by one results in AbortedException: Connection has been closed BEFORE send operation #1885

Closed efksusz closed 2 years ago

efksusz commented 2 years ago

Expected Behavior

We have a Spring Boot based micro service that is in this case receiving text messages from Angular app. Message is sent by POST in a body. Then the message is just passed through to another "3rd party backed" by POST in body. When we sent few text messages one by one we receive an error.

Code example

We have just simple RouterFunction that is passing request to handler method:

    public Mono<TextMessage> postMsg(ServerRequest request) {
        String code1 = request.pathVariable(CODE1);
        String code2 = request.pathVariable(CODE2);
        String code3 = request.pathVariable(CODE3);
        String code4 = request.pathVariable(CODE4);

        final Mono<TextMessage> textMsg = request.bodyToMono(TextMessage.class);

        return Mono.deferContextual(contextView -> ok().contentType(APPLICATION_JSON).body(fromPublisher(textMessageService.postMessage(code1,
                        code2, code3, code4, textMsg)
                .contextWrite(c -> c.put(SEC, contextView.get(SEC))), TextMessage.class)));
    }

which is passing that to service metod:

    public Mono<TextMessage> postMessage(String code1, String code2,
                                            String code3, String code4, Mono<TextMessage> msg) {
        return Mono.deferContextual(context -> theWebClient.post().uri(uriBuilder -> uriBuilder.path(URI)
                        .build(code1, code2, code3, code4))
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .header(HttpHeaders.AUTHORIZATION, "Bearer "
                        + authService.createAuth(sec, context.get(AuthenticationFilter.SEC)))
                .body(msg, TextMessage.class)
                .retrieve()
                .onStatus(HttpStatus::isError, responseErrorHandler)
                .bodyToMono(TextMessage.class).doOnError(e ->
                        log.error("Posting failed: {}", e.getMessage())));
    }

The webclient is initiated like below:

    @Bean
    @Qualifier("theWebClient")
    public WebClient getTheWebClient() {
        HttpClient httpClient = HttpClient.create(ConnectionProvider.newConnection())
                .responseTimeout(Duration.ofMillis(30000));
        ClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);

        return WebClient.builder()
                .baseUrl(baseUrl)
                .clientConnector(connector)
                .build();
    }

Exception

2021-09-24 13:51:08 180908 [reactor-http-epoll-3] WARN  r.n.http.client.HttpClientConnect - [id:14ed4a6c-1, L:/172.18.0.12:43816 ! R:10.88.0.111/10.88.0.111:49149] The connection observed an error, the request 
cannot be retried as the headers/body were sent
reactor.netty.channel.AbortedException: Connection has been closed BEFORE send operation
        at reactor.netty.channel.AbortedException.beforeSend(AbortedException.java:59)
        at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:283)
        at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
        at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:326)
        at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
2021-09-24 13:51:08 180929 [reactor-http-epoll-3] ERROR c.e.n.backend.exm.TextMessageService - Posting failed: Connection has been closed BEFORE send operation; nested exception is reactor.netty.channel.AbortedException: Connection has been closed BEFORE send operation
2021-09-24 13:51:08 180939 [reactor-http-epoll-3] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [788650b1]  500 Server Error for HTTP POST "/textMsg/1/2/3/4"
org.springframework.web.reactive.function.client.WebClientRequestException: Connection has been closed BEFORE send operation; nested exception is reactor.netty.channel.AbortedException: Connection has been closed BEFORE send operation
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
        |_ checkpoint ⇢ Request to POST http://10.88.0.111:49149/ext/resources/smile/textMsg/1/2/3/4 [DefaultWebClient]
        |_ checkpoint ⇢ Handler org.springframework.web.reactive.function.server.HandlerFilterFunction$$Lambda$1007/0x000000084063dc40@16ffac9f [DispatcherHandler]
        |_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
        |_ checkpoint ⇢ HTTP POST "/api/new//textMsg/1/2/3/4" [ExceptionHandlingWebHandler]
Stack trace:
                at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141)
                at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55)
                at reactor.core.publisher.Mono.subscribe(Mono.java:4338)
                at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
                at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93)
                at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204)
                at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:414)
                at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:251)
                at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
                at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
                at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
                at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
                at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:190)
                at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)
                at reactor.netty.http.client.HttpClientConnect$HttpObserver.onUncaughtException(HttpClientConnect.java:384)
                at reactor.netty.ReactorNetty$CompositeConnectionObserver.onUncaughtException(ReactorNetty.java:647)
                at reactor.netty.resources.NewConnectionProvider$NewConnectionObserver.onUncaughtException(NewConnectionProvider.java:195)
                at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:283)
                at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
                at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
                at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
                at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389)
                at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
                at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:326)
                at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
                at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
                at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
                at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
                at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
                at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
                at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
                at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
                at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
                at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
                at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
                at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: reactor.netty.channel.AbortedException: Connection has been closed BEFORE send operation
        at reactor.netty.channel.AbortedException.beforeSend(AbortedException.java:59)
        at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:283)
        at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:418)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389)
        at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
        at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:326)
        at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:221)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:831)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

Environment

violetagg commented 2 years ago

@efksusz Please provide either reproducible example or tcp dump.

violetagg commented 2 years ago

@efksusz I'm closing this one. We can reopen it when the requested information is available.