Closed kindlychung closed 3 years ago
when a lot concurrent files are being uploaded
for i in {0..300}; do curl --location --request POST 'http://localhost:8080/image/upload' --form 'file=@"/path/Videos/SampleVideo_1280x720_20mb.mp4"' -v & done
Is that initiating concurrent uploads or are they happening 1 after the other?
when a lot concurrent files are being uploaded
for i in {0..300}; do curl --location --request POST 'http://localhost:8080/image/upload' --form 'file=@"/path/Videos/SampleVideo_1280x720_20mb.mp4"' -v & done
Is that initiating concurrent uploads or are they happening 1 after the other?
That's concurrent.
@kindlychung Based on what I've read, Files.copy
does not close the input stream for you and you have to do it yourself. In this usage https://github.com/gemvision/micronaut_image_test/blob/main/src/main/java/micronaut_image_test/SendImageController.java#L60 the stream is not being closed. Try closing the stream and let me know if the problem persists
That's concurrent.
My mistake. The command looked like it would likely send the requests one after the other.
Thanks for the feedback.
I cannot trigger the LEAK
error any more after closing the file stream in a finally clause. But the OutOfMemoryError
can still be easily triggered.
The OutOfMemoryError
didn't go away even after setting micronaut.server.multipart.disk
to true.
Here is the heap chart from visualvm:
It doesn't seem the max heap size has been exceeded, which is really weird.
@kindlychung Why are you copying the input stream to a temp file, just to open a stream from the temp file to upload? You should be able to use the stream directly.
@kindlychung Why are you copying the input stream to a temp file, just to open a stream from the temp file to upload? You should be able to use the stream directly.
Yeah, that's not necessary at all (the repo was prepared by one of my colleagues). But I can confirm the same OutOfMemoryError
happens even when using the file.getStream()
directly.
@kindlychung Can you update the example project to represent the code that is producing the error?
@kindlychung Can you update the example project to represent the code that is producing the error?
Sure, I have simplified the code base and pushed to the main branch.
@kindlychung After looking at your changes I still am not seeing the input stream being closed https://github.com/gemvision/micronaut_image_test/blob/main/src/main/java/micronaut_image_test/SendImageController.java#L57. Are you certain that is happening through the put request?
@kindlychung After looking at your changes I still am not seeing the input stream being closed https://github.com/gemvision/micronaut_image_test/blob/main/src/main/java/micronaut_image_test/SendImageController.java#L57. Are you certain that is happening through the put request?
That input stream is from the CompletedFileUpload
and I thought it's going to be closed by micronaut? Anyway, if I close it manually in a finally
clause, the upload on s3 will be a 0-byte blob.
@kindlychung You need to close the stream. I don't know when is the time to do that given I'm unfamiliar with the S3 api, however not closing it will result in the problem you're seeing. When you are able to figure out the time to close the stream that allows the file to still be uploaded, and the issue is still happening, please update this issue and the example code.
@kindlychung You need to close the stream. I don't know when is the time to do that given I'm unfamiliar with the S3 api, however not closing it will result in the problem you're seeing. When you are able to figure out the time to close the stream that allows the file to still be uploaded, and the issue is still happening, please update this issue and the example code.
@jameskleeh I've added the finally clause to close the stream. Never mind the 0-byte blob, the OutOfMemoryError still happens.
also getting this issue while handling multipart upload with CompletedFileUpload
server_1 | java.lang.IllegalArgumentException: promise already done: DefaultChannelPromise@77ea70cb(failure: java.lang.OutOfMemoryError: Cannot reserve 16777216 bytes of direct buffer memory (allocated: 503459848, limit: 504823808))
server_1 | at io.netty.channel.AbstractChannelHandlerContext.isNotValidPromise(AbstractChannelHandlerContext.java:852)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:774)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
server_1 | at io.netty.handler.ssl.SslHandler.finishWrap(SslHandler.java:942)
server_1 | at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:928)
server_1 | at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:811)
server_1 | at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:792)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
server_1 | at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531)
server_1 | at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
server_1 | at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
server_1 | at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
server_1 | at io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:265)
server_1 | at io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
server_1 | at io.micronaut.http.server.netty.types.files.NettySystemFileCustomizableResponseType.write(NettySystemFileCustomizableResponseType.java:144)
server_1 | at io.micronaut.http.server.netty.types.files.FileTypeHandler.handle(FileTypeHandler.java:100)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler$NettyCustomizableResponseTypeHandlerInvoker.invoke(RoutingInBoundHandler.java:2044)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.writeFinalNettyResponse(RoutingInBoundHandler.java:1710)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.encodeHttpResponse(RoutingInBoundHandler.java:1570)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.access$900(RoutingInBoundHandler.java:151)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler$3.onComplete(RoutingInBoundHandler.java:1341)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler$3.onComplete(RoutingInBoundHandler.java:1099)
server_1 | at io.micronaut.http.server.netty.async.ContextCompletionAwareSubscriber.doOnComplete(ContextCompletionAwareSubscriber.java:73)
server_1 | at io.micronaut.core.async.subscriber.CompletionAwareSubscriber.onComplete(CompletionAwareSubscriber.java:71)
server_1 | at io.reactivex.internal.util.HalfSerializer.onComplete(HalfSerializer.java:90)
server_1 | at io.reactivex.internal.subscribers.StrictSubscriber.onComplete(StrictSubscriber.java:109)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:73)
server_1 | at io.reactivex.internal.operators.flowable.FlowableSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FlowableSwitchIfEmpty.java:73)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:73)
server_1 | at io.reactivex.internal.subscriptions.ScalarSubscription.request(ScalarSubscription.java:57)
server_1 | at io.reactivex.internal.subscriptions.SubscriptionArbiter.setSubscription(SubscriptionArbiter.java:99)
server_1 | at io.reactivex.internal.operators.flowable.FlowableSwitchIfEmpty$SwitchIfEmptySubscriber.onSubscribe(FlowableSwitchIfEmpty.java:51)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onSubscribe(RxInstrumentedSubscriber.java:52)
server_1 | at io.reactivex.internal.operators.flowable.FlowableJust.subscribeActual(FlowableJust.java:34)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:57)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.reactivex.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:42)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:57)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.reactivex.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:42)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:57)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.reactivex.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:42)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:57)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.reactivex.internal.operators.flowable.FlowableSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FlowableSwitchIfEmpty.java:71)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:73)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:431)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:371)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onComplete(FlowableFlatMap.java:343)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:73)
server_1 | at io.reactivex.internal.operators.maybe.MaybeToFlowable$MaybeToFlowableSubscriber.onComplete(MaybeToFlowable.java:80)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedMaybeObserver.onComplete(RxInstrumentedMaybeObserver.java:72)
server_1 | at io.reactivex.internal.operators.maybe.MaybeDoOnEvent$DoOnEventMaybeObserver.onComplete(MaybeDoOnEvent.java:115)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedMaybeObserver.onComplete(RxInstrumentedMaybeObserver.java:72)
server_1 | at io.reactivex.internal.operators.flowable.FlowableElementAtMaybe$ElementAtSubscriber.onComplete(FlowableElementAtMaybe.java:102)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:73)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:431)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:371)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onComplete(FlowableFlatMap.java:343)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:73)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.slowPath(FlowableFromIterable.java:255)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:124)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:117)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onSubscribe(RxInstrumentedSubscriber.java:52)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:69)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:57)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.internal.operators.flowable.FlowableElementAtMaybe.subscribeActual(FlowableElementAtMaybe.java:36)
server_1 | at io.reactivex.Maybe.subscribe(Maybe.java:4290)
server_1 | at io.micronaut.reactive.rxjava2.RxInstrumentedMaybe.subscribeActual(RxInstrumentedMaybe.java:53)
server_1 | at io.reactivex.Maybe.subscribe(Maybe.java:4290)
server_1 | at io.reactivex.internal.operators.maybe.MaybeDoOnEvent.subscribeActual(MaybeDoOnEvent.java:39)
server_1 | at io.reactivex.Maybe.subscribe(Maybe.java:4290)
server_1 | at io.reactivex.internal.operators.maybe.MaybeToFlowable.subscribeActual(MaybeToFlowable.java:45)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.internal.operators.flowable.FlowableSwitchIfEmpty.subscribeActual(FlowableSwitchIfEmpty.java:32)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14882)
server_1 | at io.micronaut.http.server.context.ServerRequestContextFilter.lambda$doFilter$0(ServerRequestContextFilter.java:62)
server_1 | at io.reactivex.internal.operators.flowable.FlowableFromPublisher.subscribeActual(FlowableFromPublisher.java:29)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14935)
server_1 | at io.reactivex.Flowable.subscribe(Flowable.java:14885)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.lambda$buildExecutableRoute$8(RoutingInBoundHandler.java:1099)
server_1 | at io.micronaut.web.router.BasicObjectRouteMatch.decorate(BasicObjectRouteMatch.java:66)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.buildExecutableRoute(RoutingInBoundHandler.java:1084)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.prepareRouteForExecution(RoutingInBoundHandler.java:1061)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.handleRouteMatch(RoutingInBoundHandler.java:722)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.channelRead0(RoutingInBoundHandler.java:581)
server_1 | at io.micronaut.http.server.netty.RoutingInBoundHandler.channelRead0(RoutingInBoundHandler.java:148)
server_1 | at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
server_1 | at io.micronaut.http.server.netty.ssl.HttpRequestCertificateHandler.channelRead(HttpRequestCertificateHandler.java:52)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.micronaut.http.netty.stream.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:199)
server_1 | at io.micronaut.http.netty.stream.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:121)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
server_1 | at io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
server_1 | at io.netty.handler.codec.http.HttpServerKeepAliveHandler.channelRead(HttpServerKeepAliveHandler.java:64)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
server_1 | at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
server_1 | at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
server_1 | at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
server_1 | at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
server_1 | at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
server_1 | at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
server_1 | at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
server_1 | at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
server_1 | at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
server_1 | at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
server_1 | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
server_1 | at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
server_1 | at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
server_1 | at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
server_1 | at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
server_1 | at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
server_1 | at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
server_1 | at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
server_1 | at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
server_1 | at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
server_1 | at java.base/java.lang.Thread.run(Thread.java:832)
@Post("/")
@Secured(SecurityRule.IS_AUTHENTICATED)
@Consumes(MediaType.MULTIPART_FORM_DATA)
fun postFile(@Part file: CompletedFileUpload): HttpResponse<String> {
log.info("Receiving file ${file.filename}")
return try {
val filename = file.filename
val newFile = Files.createFile(Paths.get("$fileStoragePath/$filename"))
Files.write(newFile, file.bytes)
HttpResponse.created("Uploaded")
} catch (e: FileAlreadyExistsException) {
HttpResponse.badRequest("Upload Failed: ${file.filename} already exists")
} catch (e: IOException) {
HttpResponse.serverError("Upload Failed: System error")
}
}
Over the weekend i did some tests on the server, and the leaks seem to occur when FileAlreadyExistsException is caught. The request is processed correctly, but something does not get closed i'm guessing
We are experiencing similar behaviour using Java 11 and Micronaut 2.3.2. We've managed to reproduce the behaviour locally with our application running in docker with limited memory and using the for i in {1...1000}; ... &; done
approach to send curl requests concurrently. We simplified the controller to just return some dummy data.
@Post(value = "/image", consumes = MediaType.MULTIPART_FORM_DATA)
public Maybe<Profile> addImage(
CompletedFileUpload file
) {
return Maybe.just(Profile.builder()
.id(UUID.randomUUID())
.name("name")
.image("...")
.build());
}
We set the jvm flag -Dio.netty.leakDetection.level=advanced
and after 1-3 runs with the for loop we saw the ERROR LEAK: ByteBuf.release() was not called before it's garbage-collected.
in the logs. Full stacktrace is attached.
@jaksah can you provide the example application?
@kalgecin If this line val newFile = Files.createFile(Paths.get("$fileStoragePath/$filename"))
is causing the exception that means you are never reading the file, and thus the buffer is not released
isn't the correct behavior to release the resources after the method returns? What if my logic does not require to process the file at certain times? Say for example i check the file size for limits, or something else.
@graemerocher Here's an example application: https://github.com/jaksah/micronaut-multipart-memory-leak
isn't the correct behavior to release the resources after the method returns?
@kalgecin The logic that processes the return of the method has no knowledge of any file arguments, etc.. The data is released as soon as you use it, which is the correct approach given the data can be released sooner compared to the end of the method. Not to mention methods that return reactive streams return immediately before the file is read
@jaksah In that example application you are also not reading the file, thus the same thing applies
@kalgecin I've created a PR to add a method to allow you to discard the file https://github.com/micronaut-projects/micronaut-core/pull/5052
Thanks for clarifying @jameskleeh. I've updated my example application to have the controller read the bytes of the file but also making the route secured and managed to reproduce the LEAK by sending unauthenticated requests. How do we make sure to release the data in this case?
@jaksah I'll take a look. You shouldn't be responsible for doing so in that case
Is it not possible to try and close all buffers on method return?
16/03/2021 12:24:09.064 ERROR i.n.u.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:338)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:52)
io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.offer(HttpPostRequestDecoder.java:223)
io.micronaut.http.server.netty.FormDataHttpContentProcessor.onData(FormDataHttpContentProcessor.java:115)
io.micronaut.http.server.netty.AbstractHttpContentProcessor.doOnNext(AbstractHttpContentProcessor.java:78)
io.micronaut.http.server.netty.AbstractHttpContentProcessor.doOnNext(AbstractHttpContentProcessor.java:36)
io.micronaut.core.async.subscriber.CompletionAwareSubscriber.onNext(CompletionAwareSubscriber.java:52)
io.micronaut.http.netty.reactive.HandlerPublisher.publishMessage(HandlerPublisher.java:378)
io.micronaut.http.netty.reactive.HandlerPublisher.channelRead(HandlerPublisher.java:334)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.micronaut.http.netty.stream.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:292)
io.micronaut.http.netty.stream.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:257)
io.micronaut.http.netty.stream.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:121)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
io.netty.handler.codec.http.HttpServerKeepAliveHandler.channelRead(HttpServerKeepAliveHandler.java:64)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:832)
16/03/2021 12:24:09.065 ERROR i.n.u.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:338)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:52)
io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.offer(HttpPostRequestDecoder.java:223)
io.micronaut.http.server.netty.FormDataHttpContentProcessor.onData(FormDataHttpContentProcessor.java:115)
io.micronaut.http.server.netty.AbstractHttpContentProcessor.doOnNext(AbstractHttpContentProcessor.java:78)
io.micronaut.http.server.netty.AbstractHttpContentProcessor.doOnNext(AbstractHttpContentProcessor.java:36)
io.micronaut.core.async.subscriber.CompletionAwareSubscriber.onNext(CompletionAwareSubscriber.java:52)
io.micronaut.http.netty.reactive.HandlerPublisher.publishMessage(HandlerPublisher.java:378)
io.micronaut.http.netty.reactive.HandlerPublisher.channelRead(HandlerPublisher.java:334)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.micronaut.http.netty.stream.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:292)
io.micronaut.http.netty.stream.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:257)
io.micronaut.http.netty.stream.HttpStreamsServerHandler.channelRead(HttpStreamsServerHandler.java:121)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
io.netty.handler.codec.MessageToMessageCodec.channelRead(MessageToMessageCodec.java:111)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
io.netty.handler.codec.http.HttpServerKeepAliveHandler.channelRead(HttpServerKeepAliveHandler.java:64)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
io.netty.handler.flow.FlowControlHandler.read(FlowControlHandler.java:139)
io.netty.channel.AbstractChannelHandlerContext.invokeRead(AbstractChannelHandlerContext.java:686)
io.netty.channel.AbstractChannelHandlerContext.read(AbstractChannelHandlerContext.java:671)
io.micronaut.http.netty.reactive.HandlerPublisher.requestDemand(HandlerPublisher.java:163)
io.micronaut.http.netty.stream.HttpStreamsHandler$2.requestDemand(HttpStreamsHandler.java:248)
io.micronaut.http.netty.reactive.HandlerPublisher$ChannelSubscription.receivedDemand(HandlerPublisher.java:547)
io.micronaut.http.netty.reactive.HandlerPublisher$ChannelSubscription.lambda$request$0(HandlerPublisher.java:474)
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:832)
16/03/2021 12:24:09.066 ERROR i.n.u.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:2212)
io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1382)
io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:832)
unfortunately even after adding code to read the bytes on method entry, before any exceptions can be thrown, still getting this log
We managed to find a workaround that does not trigger the ByteBuf.release() exception any more, which basically reads the whole body inside the controller instead of using argument binding. More or less like this:
@Post(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA)
public Single<HttpResponse> upload(
@Body MultipartBody body
) {
return readMultipartBody(body)
.flatMap(...)
}
Single<RequestDTO> readMultipartBody(final MultipartBody body) {
return Single.create(emitter -> body.subscribe(new Subscriber<>() {
private Subscription s;
private RequestDTO.Builder requestBuilder = RequestDTO.builder();
private final List<Exception> errors = new ArrayList<>();
@Override
public void onSubscribe(final Subscription s) {
this.s = s;
s.request(1);
}
@Override
public void onNext(final CompletedPart completedPart) {
try {
// Always read the data to make sure it's released
byte[] data = completedPart.getBytes();
// logic to append information to the builder
} catch (IOException e) {
log.warn(
"Unable to read part " + completedPart.getName() + " from multipart/form-data request", e);
errors.add(e);
}
s.request(1);
}
@Override
public void onError(final Throwable t) {
emitter.onError(t);
}
@Override
public void onComplete() {
if (errors.isEmpty()) {
emitter.onSuccess(requestBuilder.build());
} else {
Throwable error = new HttpStatusException(HttpStatus.BAD_REQUEST, "Bad request");
for (Exception e : errors) {
error.addSuppressed(e);
}
emitter.onError(error);
}
}
}));
}
@jaksah your solution seems to work for me too. It would seem there's a problem with the way micronaut handles the body of multipart when using @Part
args
Thanks for clarifying @jameskleeh. I've updated my example application to have the controller read the bytes of the file but also making the route secured and managed to reproduce the LEAK by sending unauthenticated requests. How do we make sure to release the data in this case?
This case has been resolved in 2.5
We just had the same problem on micronaut 2.5.13 and 3.0.0, stress testing service with 100-200 workers doing multipart fileupload of ~100KB sized files. To trigged it, you'll need at least few dozen parallel requests.
Netty's resource leak diagnostics points out to FormDataHttpContentProcessor:119. Tried CompletedFileUpload, StreamingUpload as well as byte[] as controller parameters to no avail.
@jaksah solution worked, I guess that using @Body MultipartBody bypasses FormDataHttpContentProcessor.
@vnesek do you have a reproducer?
I'll try to post a sample project in next few days.
Same as @vnesek here. Using CompletedFileUpload
.
Working on a reproducer...
LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.order(AdvancedLeakAwareByteBuf.java:71)
io.netty.buffer.CompositeByteBuf.newComponent(CompositeByteBuf.java:346)
io.netty.buffer.CompositeByteBuf.addComponent0(CompositeByteBuf.java:287)
io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:265)
io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:222)
io.netty.handler.codec.http.multipart.AbstractMemoryHttpData.addContent(AbstractMemoryHttpData.java:131)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.loadDataMultipartOptimized(HttpPostMultipartRequestDecoder.java:1225)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.getFileUpload(HttpPostMultipartRequestDecoder.java:926)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.decodeMultipart(HttpPostMultipartRequestDecoder.java:572)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.parseBodyMultipart(HttpPostMultipartRequestDecoder.java:463)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.parseBody(HttpPostMultipartRequestDecoder.java:432)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:347)
io.netty.handler.codec.http.multipart.HttpPostMultipartRequestDecoder.offer(HttpPostMultipartRequestDecoder.java:54)
io.micronaut.http.server.netty.FormDataHttpContentProcessor.onData(FormDataHttpContentProcessor.java:119)
...
@graemerocher I created a reproducer here: https://github.com/caiolopes/micronaut-file-upload
There are two endpoints, one using @Body MultipartBody body
and another using CompletedFileUpload
. Only the one with CompletedFileUpload
happens the LEAK error. You may need to run it a few times to see it, but for me it shows up every time I run.
Task List
Steps to Reproduce
for i in {0..300}; do curl --location --request POST 'http://localhost:8080/image/upload' --form 'file=@"/path/Videos/SampleVideo_1280x720_20mb.mp4"' -v & done
Expected Behaviour
All files should be uploaded to s3.
Actual Behaviour
Memory leak. Stacktrace:
This error always happens after an OutOfMemory exception (when a lot concurrent files are being uploaded):
Environment Information
Example Application
micronaut image test