Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.34k stars 1.98k forks source link

[BUG] Storage blobClient.openSeekableByteChannelRead makes unnecessary request to determine if stream has ended #38070

Open lmolkova opened 10 months ago

lmolkova commented 10 months ago

Stress-test findings:

the code blobClient.openSeekableByteChannelRead(new BlobSeekableByteChannelReadOptions(), span)

results in the following HTTP requests image

The first one does not know the content size and request the 4MB chunk, but all the content is returned in the first request and the second one (resulting in 416) is not necessary and can be optimized away.

Based on conversation with @ibrahimrabab and @alzimmermsft the SDK may do the following:

  1. if ETag consistency is applied, we don't need to wait for 416 and can determine if we got everything based on the content-range response header for the first chunk.
  2. If ETags are not used, then we should keep going until we get 416.
  3. Also, currently, if an error happens when reading the body of 416 response, the openSeekableByteChannelRead throws even though the content is received fully. We should consider just logging a warning when this happens since we already know we got all the content and error has happened after.

Exception or Stack Trace

This exception is thrown when response body streaming fail when receiving 416 response (corresponding to the problem p3 above)

reactor.core.Exceptions$ReactiveException: io.netty.channel.unix.Errors$NativeIoException: recvAddress(..) failed: Connection reset by peer
    at reactor.core.Exceptions.propagate(Exceptions.java:396)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:98)
    at reactor.core.publisher.Mono.block(Mono.java:1742)
    at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:147)
    at com.azure.storage.blob.specialized.BlobClientBase.downloadStreamWithResponse(BlobClientBase.java:964)
    at com.azure.storage.blob.specialized.StorageSeekableByteChannelBlobReadBehavior.read(StorageSeekableByteChannelBlobReadBehavior.java:76)
    at com.azure.storage.common.implementation.StorageSeekableByteChannel.refillReadBuffer(StorageSeekableByteChannel.java:188)
    at com.azure.storage.common.implementation.StorageSeekableByteChannel.read(StorageSeekableByteChannel.java:166)
    at java.base/sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
    at java.base/sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:107)
    at java.base/sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:101)
    at com.azure.storage.stress.CrcInputStream.read(CrcInputStream.java:60)
    at java.base/java.io.InputStream.read(InputStream.java:205)
    at com.azure.storage.blob.stress.OpenSeekableByteChannelRead.runInternal(OpenSeekableByteChannelRead.java:43)
    at com.azure.storage.blob.stress.BlobScenarioBase.run(BlobScenarioBase.java:84)
    at com.azure.perf.test.core.PerfStressTest.runTest(PerfStressTest.java:31)
    at com.azure.perf.test.core.ApiPerfTestBase.runAll(ApiPerfTestBase.java:156)
    at com.azure.perf.test.core.PerfStressProgram.lambda$runTests$12(PerfStressProgram.java:244)
    at java.base/java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1448)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:100)
        ... 22 more
Caused by: io.netty.channel.unix.Errors$NativeIoException: recvAddress(..) failed: Connection reset by peer

To Reproduce Reproduces with the following stress test:

https://github.com/Azure/azure-sdk-for-java/blob/abadbdfa9a652a119bad4cbb409a939b435d1f72/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/OpenSeekableByteChannelRead.java#L38-L48

seanmcc-msft commented 9 months ago

We should fix this in the future.