spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.27k stars 37.98k forks source link

DataBufferUtils "doesn't respect backpressure" #25093

Closed evgenyvsmirnov closed 8 months ago

evgenyvsmirnov commented 4 years ago

Please consider the following REST controller method which employs DataBufferUtils to transmit a file:

    @RequestMapping(path = "/file/{id}", method = GET, produces = APPLICATION_OCTET_STREAM_VALUE)
    @ResponseBody
    public Mono<Void> downloadFile(ServerWebExchange exchange, @PathVariable("id") String id) {
        return Mono
                .fromRunnable(() -> {
                    HttpHeaders headers = exchange.getResponse().getHeaders();
                    headers.add(HttpHeaders.CONTENT_LENGTH, Long.toString(getFileLength(id)));
                    headers.add(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_VALUE);
                })
                .subscribeOn(scheduler)
                .then(exchange.getResponse().writeWith(
                        DataBufferUtils.readInputStream(
                                () -> getFileInputStream(id),
                                exchange.getResponse().bufferFactory(),
                                8192
                        ).subscribeOn(scheduler)));
    }

Expected behaviour: a 200, 4xx or 5xx is transmitted to the client or the connection is closed amid the transmission. Actual behaviour: client hangs

So once in a while I stumble across the following stacktraces in the production logs (the bad thing is that I can't reproduce them with a test – it seems reactor-netty disregards the clients TCP window size which the test could capitalize on):

2020/05/15-04:50:59.581-0,JAVA,0,level=WARN,pid=6500,threadId=182,thread=webflux-selector-http-3,logger=GlobalExceptionHandler,message='[d8818abc-1105] Request processing failed: HTTP GET "/file/file5412465168127"',exception=reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
    at reactor.core@3.3.2.RELEASE/reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
    at reactor.netty@0.9.4.RELEASE/reactor.netty.channel.MonoSendMany$SendManyInner.onNext(MonoSendMany.java:190)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at spring.web@5.2.4.RELEASE/org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:173)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onNext(FluxSubscribeOn.java:151)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onNext(FluxUsing.java:350)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxGenerate$GenerateSubscription.next(FluxGenerate.java:169)
    at spring.core@5.2.4.RELEASE/org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:644)
    at spring.core@5.2.4.RELEASE/org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:618)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxGenerate.lambda$new$1(FluxGenerate.java:56)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:262)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:204)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.request(FluxUsing.java:317)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.lambda$requestUpstream$0(FluxSubscribeOn.java:135)
    at reactor.core@3.3.2.RELEASE/reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core@3.3.2.RELEASE/reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

2020/05/15-04:50:59.586-0,JAVA,0,level=ERROR,pid=6500,threadId=182,thread=webflux-selector-http-3,logger=org.springframework.web.server.adapter.HttpWebHandlerAdapter,message='[d8818abc-1105] Error [java.lang.UnsupportedOperationException] for HTTP GET "/file/file5412465168127", but ServerHttpResponse already committed (200 OK)'
2020/05/15-04:50:59.586-0,JAVA,0,level=ERROR,pid=6500,threadId=182,thread=webflux-selector-http-3,logger=reactor.netty.http.server.HttpServerOperations,message='[id: 0xd8818abc, L:/192.168.44.20:9090 - R:/192.168.44.23:37330] Error finishing response. Closing connection',exception=java.lang.UnsupportedOperationException
    at spring.web@5.2.4.RELEASE/org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:106)
    at spring.web@5.2.4.RELEASE/org.springframework.http.HttpHeaders.setContentType(HttpHeaders.java:951)
    at webflux@2020.3.0-20200416.024359-2381/ErrorWebExceptionHandler.handle(ErrorWebExceptionHandler.java:82)
    at spring.web@5.2.4.RELEASE/org.springframework.web.server.handler.ExceptionHandlingWebHandler.lambda$handle$0(ExceptionHandlingWebHandler.java:77)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:88)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:390)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators.error(Operators.java:182)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Mono.subscribe(Mono.java:4105)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:390)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:185)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:251)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators.error(Operators.java:182)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:134)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Mono.subscribe(Mono.java:4090)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:390)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators.error(Operators.java:182)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:181)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:48)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2199)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:132)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:318)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:145)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Mono.subscribe(Mono.java:4105)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Mono.subscribe(Mono.java:4105)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:227)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1684)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:306)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:185)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:251)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators.error(Operators.java:182)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoError.subscribe(MonoError.java:52)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Mono.subscribe(Mono.java:4105)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1684)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:306)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:251)
    at spring.web@5.2.4.RELEASE/org.springframework.http.server.reactive.ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:414)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:76)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1876)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1684)
    at reactor.core@3.3.2.RELEASE/reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:306)
    at reactor.netty@0.9.4.RELEASE/reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:323)
    at reactor.netty@0.9.4.RELEASE/reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:383)
    at reactor.netty@0.9.4.RELEASE/reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:527)
    at reactor.netty@0.9.4.RELEASE/reactor.netty.channel.MonoSendMany$SendManyInner.trySuccess(MonoSendMany.java:94)
    at io.netty.common@4.1.45.Final/io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
    at io.netty.transport@4.1.45.Final/io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
    at io.netty.transport@4.1.45.Final/io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
    at io.netty.transport@4.1.45.Final/io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
    at io.netty.transport@4.1.45.Final/io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:431)
    at io.netty.transport@4.1.45.Final/io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
    at io.netty.transport@4.1.45.Final/io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361)
    at io.netty.transport@4.1.45.Final/io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:708)
    at io.netty.transport@4.1.45.Final/io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.netty.transport@4.1.45.Final/io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.netty.transport@4.1.45.Final/io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.common@4.1.45.Final/io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.common@4.1.45.Final/io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:834)

My first concern is "Queue is full: Reactive Streams source doesn't respect backpressure" – have no idea how could I "respect backpressure" – your expertise would be valued.

The second and the foremost one is "Error [java.lang.UnsupportedOperationException] for HTTP GET "/file/file5412465168127", but ServerHttpResponse already committed (200 OK)'". Evidence suggests that reactor-netty can't keep up with the transmission, fails, the error is propagated to the application exception handler which composes a 5xx response, but HttpWebHandlerAdapter afterwards detects that at least the status line is in the wire and rightly rejects 5xx response (the only thing it can do in such a situation). I came up with the idea of the conditional exception suppression:

.then(...)
.onErrorResume(e -> serverWebExchange.getResponse().isCommitted(), e -> Mono.empty());

hoping to prevent my exception handler from the error processing. In my eyes its an odd solution because such situations are inherent in HTTP servers.

Are there any flaws in my implementation or the problem is caused by an issue in DataBufferUtils or elsewhere?

Environment:

rstoyanchev commented 8 months ago

There is built-in support for writing a file:

    @RequestMapping(path = "/file/{id}", method = GET, produces = APPLICATION_OCTET_STREAM_VALUE)
    @ResponseBody
    public Resource downloadFile(ServerWebExchange exchange, @PathVariable("id") String id) {
        return new FileSystemResource(getFile());
    }

On servers that support zero-copy like Reactor Netty, this will perform a zero-copy file transfer (see ResourceHttpMessageWriter), or otherwise will fall back on DataBufferUtils.read(resource, ...) which internally uses DataBufferUtils.readAsynchronousFileChannel (see ResourceEncoder).

In short I don't know the underlying cause for the above, but I would suggest using the built-in support if possible, which relies on more efficient zero-copy transfer, or otherwise Resource writing via DataBufferUtils.

spring-projects-issues commented 8 months ago

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

spring-projects-issues commented 8 months ago

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.