Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.36k stars 2k forks source link

[BUG] DataLakeFileAsyncClient upload() method has memory leak causing file to be read into memory - causes OOM #10957

Closed roneiv closed 4 years ago

roneiv commented 4 years ago

Describe the bug When using the DataLakeFileAsyncClient on very large files (from 10GB and up), the JVM dies on OutOfMemoryError as there is a memory leak causing the file to be read into memory.

Exception or Stack Trace Error was received while reading the incoming data. The connection will be closed. java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:89) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:32) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:637) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:618) at reactor.core.publisher.FluxGenerate.lambda$new$1(FluxGenerate.java:56) at reactor.core.publisher.FluxGenerate$$Lambda$263/1703559123.apply(Unknown Source) at reactor.core.publisher.FluxGenerate$GenerateSubscription.fastPath(FluxGenerate.java:223) at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:202) at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.request(FluxUsing.java:317) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:150) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:346) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:184) at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onSubscribe(FluxWindowPredicate.java:180) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:81) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:255) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onSubscribe(FluxDoFinally.java:117) at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onSubscribe(FluxUsing.java:344) at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:83) at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:102) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:190) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:240) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1706) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) [reactor-http-epoll-2] WARN io.netty.channel.AbstractChannelHandlerContext - An exception 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception: java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:89) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:32) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:637) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:618) at reactor.core.publisher.FluxGenerate.lambda$new$1(FluxGenerate.java:56) at reactor.core.publisher.FluxGenerate$$Lambda$263/1703559123.apply(Unknown Source) at reactor.core.publisher.FluxGenerate$GenerateSubscription.fastPath(FluxGenerate.java:223) at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:202) at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.request(FluxUsing.java:317) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:150) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.request(FluxMapFuseable.java:346) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:184) at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onSubscribe(FluxWindowPredicate.java:180) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:81) at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onSubscribe(FluxMapFuseable.java:255) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onSubscribe(FluxDoFinally.java:117) at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onSubscribe(FluxUsing.java:344) at reactor.core.publisher.FluxGenerate.subscribe(FluxGenerate.java:83) at reactor.core.publisher.FluxUsing.subscribe(FluxUsing.java:102) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:190) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:240) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1706) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296)

To Reproduce Attempt to upload any file large enough to fill up the memory on the machine

Code Snippet ` DataLakeFileAsyncClient fileAsyncClient = fileSystemClient.getFileAsyncClient(targetPath); InputStream is = Files.newInputStream(path, READ);

    Flux<DataBuffer> fileBuffer = DataBufferUtils.readInputStream(
            () -> is,
            new DefaultDataBufferFactory(), 1024*1024).doFinally(__ -> {
        try {
            is.close();
        } catch (Exception e) {
            LOG.error("Exception attempting to close stream ", e);
        }
    });
    Flux<ByteBuffer> byteBufferFlux = fileBuffer.map(DataBuffer::asByteBuffer);

    fileAsyncClient.exists()
            .flatMap(b -> {
                if (b) {
                    return fileAsyncClient.delete();
                } else {
                    return Mono.empty();
                }
            })
            .block();

    fileAsyncClient.upload(byteBufferFlux, PARALLEL_TRANSFER_OPTIONS, false)
    .doOnError(Throwable::printStackTrace)
    .block();`

Expected behavior The expected result is that the upload method would do "lazy" buffering, i.e. reading as needed uploading/appending chunks to the object storage.

Setup (please complete the following information):

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

rickle-msft commented 4 years ago

Hi, @roneiv. Thank you for opening this issue. I suspect this is the same root cause as #10823. To my understanding, we aren't eagerly reading the entire file per se, but the level of parallelism is too aggressive and is trying to issue too many requests and hence read too much of the file at once. I think our plan is to offer an option that will allow for configuring this level of concurrency on uploading files.

roneiv commented 4 years ago

@rickle-msft Hi Rick, Thanks for your quick response. I understand that you do not "explicitly" read the file into memory per se, and of course I don't have a full insight in the internals of the code. But it at least looks like (due to the aggressive parallelism as you say) you read more bytes from the file compared to what you manage to flush to the object storage, hence gradually the process eats up the memory of the machine till it eventually dies. Probably a good fix would be to make a more "balanced" approach between read and write so that the ratio between them is consistent, e.g. lazy reading.

Are there any "timeframe" in when we could hope for a resolution to this issue?

Kind Regards;

Eivind

rickle-msft commented 4 years ago

Yes that is in agreement with my understanding as to why memory is running out.

A bit of nuance. I don't think we are reading too eagerly in the sense that we are reading data before we're ready to put it on the wire. I think we're trying to issue too many requests at once, and each of those is requesting data from the file which eventually ends up being more memory than the system can handle.

In any case, if we give a means of limiting the maximum number of requests at once, that should limit the amount of memory that's being used at any given time and solve the problem.

We do releases at the beginning of each month. There are some new service features we're working on supporting at the moment, and I need to check on the relative priorities there

roneiv commented 4 years ago

Thanks for the information again @rickle-msft! A bit of additional details though, which might put some questions on the table and point in slightly other directions than it being related to only the maximum number of requests at once:

As far as we've seen on the memory consumption, the first 2747269120 bytes are written perfectly fine. No memory "leak". But when reaching 2747269120 bytes, it is like the process "hangs" for 10-20 seconds, and then continues. And from this point and onward, the memory consumption starts to grow. I don't know for sure where the number 2747269120 comes from, but we've hit this "limit" on multiple machines/environments with different memory/heap available. Might be something worth looking into.

Also, while I am at it - because it could somewhat be related; We discovered a "hard constraint" between this version (12.1.0) of the library and the Netty version in use. For a given project we had Netty version 4.1.38.Final brought in by some other dependencies, and attempting to use the async client upload() with that version of Netty caused the upload process to "hang"/freeze after approx 500MB was uploaded. Any file below 500MB was successful, any file above 500MB in size failed/hanged infinitely and the process had to be killed. We reproduced this consistently on several environments with that version of Netty. Then, switching to Netty version 4.1.45.Final or 4.1.5.49.Final, this problem was resolved immediately, and we could upload larger files without any issues (until we were struck by the OOM error described here).

It might be something worth mentioning in documentation at least to avoid others facing the same issue.

rickle-msft commented 4 years ago

Hmm that is interesting. Thanks! So a little over the 2GB mark if I'm reading that correctly, the memory behavior starts to change. Out of curiosity. you also test this with various block sizes to see if that number stays the same independent of the block size?

I'd also be curious if you could try this using the latest released version--12.6.0--and let me know if you guys are still hitting memory problems?

@anuchandy FYI on the Netty dependency issues being mentioned in case we need to update any of our guidance for customers based on that.

roneiv commented 4 years ago

The mark seems to be 2.74726912 in GB, according to google at least :) I will make a couple of tests with different block sizes to see if we have a shift in the number.

Related to the version, we use the azure-storage-file-datalake v.12.1.0, and it is my understanding that it already holds the 12.6.0 version of the azure-storage-blob as a direct dependency. At least my IDE is telling me so :)

I see there is a 12.1.1 version available of the datalake library now, but I assume it will not change anything related to this bug. https://mvnrepository.com/artifact/com.azure/azure-storage-file-datalake

roneiv commented 4 years ago

@rickle-msft Here are some numbers varying on block size:

Block size 10x1024 reads 2682880 bytes [reactor-http-epoll-8] INFO azure.TestDatalakeAsync - Transferred 2682880 [reactor-http-epoll-2] ERROR reactor.netty.channel.ChannelOperationsHandler - [id: 0xfb40b73b, L:/10.100.29.164:54968 - R:dlaeuecpbdpetrotechdigp.dfs.core.windows.net/13.82.152.54:443] Error was received while reading the incoming data. The connection will be closed. java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:89) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:32) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:637) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:618)

Block size 1x1024x1024 reads 273678336 bytes [reactor-http-epoll-5] INFO azure.TestDatalakeAsync - Transferred 273678336 [reactor-http-epoll-2] ERROR reactor.netty.channel.ChannelOperationsHandler - [id: 0xe66ee7c3, L:/10.100.29.164:56024 - R:dlaeuecpbdpetrotechdigp.dfs.core.windows.net/13.82.152.54:443] Error was received while reading the incoming data. The connection will be closed. java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:89) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:32) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:637) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:618)

Block size 10x1024x1024 (as in this ticket) reads 2747269120 bytes: [reactor-http-epoll-3] INFO azure.TestDatalakeAsync - Transferred 2747269120 [reactor-http-epoll-2] ERROR reactor.netty.channel.ChannelOperationsHandler - [id: 0x52ee2ec0, L:/10.100.29.164:56920 - R:dlaeuecpbdpetrotechdigp.dfs.core.windows.net/13.82.152.54:443] Error was received while reading the incoming data. The connection will be closed. java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:89) at org.springframework.core.io.buffer.DefaultDataBufferFactory.allocateBuffer(DefaultDataBufferFactory.java:32) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:637) at org.springframework.core.io.buffer.DataBufferUtils$ReadableByteChannelGenerator.accept(DataBufferUtils.java:618)

So yes, the number of bytes we successfully write before Heap space/OOM kicks in varies with the block size. Respectively we get approx 2,7MB, 273MB and 2,7 GB for the three different block sizes (10KB, 1MB, 10MB)

rickle-msft commented 4 years ago

Ah. Sorry about the versioning comment. I forgot we were discussing datalake and not regular blobs.

Very interesting. I'll have to think about/look around for what might be causing this and circle back. Thank you for trying those additional cases.

roneiv commented 4 years ago

One last thing; I don't know if it matters, but the measurements done with the block size were performed attempting to upload a very large file of 75278709888 bytes (75.278 GB)

asm0dey commented 4 years ago

Possible workaround until fix is released: open outputstream to blob and copt data from inputstream from your file to outputstream to blob.

roneiv commented 4 years ago

@asm0dey Hi Paul,

You mean to use the azure-storage-blob SDK instead?

asm0dey commented 4 years ago

@roneiv possibly yes. It won't add too many dependencies to your application and will temporary fix the problem.

roneiv commented 4 years ago

@asm0dey I was under the impression that the blob client also suffered from the same issue: https://github.com/Azure/azure-sdk-for-java/issues/10823

asm0dey commented 4 years ago

@roneiv I'm the author of those issue :) And that's workaround I use now

roneiv commented 4 years ago

@asm0dey Ah sorry Paul, didn’t notice that :)

roneiv commented 4 years ago

@asm0dey Anyway it is my understanding we need to provide the stream length upfront to use that approach, and we don’t have a local file - we’re streaming through our application so the length is unknown.

asm0dey commented 4 years ago

@roneiv but it changes nothing! You can infinitely copy from input stream to outputstream

roneiv commented 4 years ago

@asm0dey Would you have a code snippet to share with us related to how you implemented your workaround? We assume that you make use of the AppendBlobClient and the getBlobOutputStream(), however when attempting to copy from inputstream to outputstream we get;

com.azure.storage.blob.models.BlobStorageException: Status code 400, "{"error":{"code":"MissingRequiredHeader","message":"An HTTP header that's mandatory for this request is not specified.\nRequestId:94716277-b01f-0008-5176-27dd28000000\nTime:2020-05-11T09:30:39.9206597Z"}}" at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) at com.azure.core.http.rest.RestProxy.instantiateUnexpectedException(RestProxy.java:357) at com.azure.core.http.rest.RestProxy.lambda$ensureExpectedStatus$3(RestProxy.java:398) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)

So it seems we miss a required header? (And the error message doesn't state which one)

asm0dey commented 4 years ago

Our codebase is primarily in Kotlin, so please excuse me for Kotlin snippets.

  1. We have blockBlobClient, not append one (also you shoud be aware that you can't change type of blob — if it's append you wont be able to work with it like its block)

  2. We open output stream to it with

    client.blockBlobClient.getBlobOutputStream(true)

    where client is usual blob client, obtained with BlobContainerClient#getBlobClient

  3. Then we open input stream from local file with

    file.inputStream().buffered()
  4. then we just call slishtly changed version of copyTo method, effectively it is

    
    file.inputStream().buffered().use { input ->
    client.blockBlobClient.getBlobOutputStream(true).buffered().use { output ->
    input.copyTo(output)
    }
    }
asm0dey commented 4 years ago

@roneiv PS: your error looks like separate issue :)

roneiv commented 4 years ago

@asm0dey Thanks so much anyways! Yeah the error with the http header seems like a different issue, but of course impacts us equally as much :)

rickle-msft commented 4 years ago

Thanks, guys, for conferring on these issues :)

@roneiv Can you share the code snippet that was hitting the header error for getting an OutputStream to an Append Blob? I can do a network capture and see what header is missing.

roneiv commented 4 years ago

@rickle-msft Hi think we figured it out, by accident we had used the .dfs endpoint instead of the .blob endpoint when testing with the AppendBlobClient.

We changed to .blob and also switched to BlockBlobClient, and then the copy of InputStream to OutputStream seems to work.

rickle-msft commented 4 years ago

@roneiv FYI #11478

rickle-msft commented 4 years ago

I am going to close this issue as the fix has been merged into master and we have had reports of success when using the new beta version.