Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.35k stars 1.99k forks source link

[BUG] IllegalStateException: block()/blockFirst()/blockLast() while BlobClient.uploadWithResponse #41866

Closed Stefan-R-Acc closed 1 month ago

Stefan-R-Acc commented 1 month ago

Describe the bug We are facing the below exception. It occurs rarely and not reproduceable when using an EncryptedBlobClient and Envelope Encryption for uploading a blob. Doing some retries often comes to a success but is very expensive for large files.

Exception or Stack Trace Result: Failure Exception: IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-54 Stack: java.lang.reflect.InvocationTargetException at jdk.internal.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at com.microsoft.azure.functions.worker.broker.JavaMethodInvokeInfo.invoke(JavaMethodInvokeInfo.java:22) at com.microsoft.azure.functions.worker.broker.JavaMethodExecutor.execute(JavaMethodExecutor.java:22) at com.microsoft.azure.functions.worker.chain.FunctionExecutionMiddleware.invoke(FunctionExecutionMiddleware.java:19) at com.microsoft.azure.functions.worker.chain.InvocationChain.doNext(InvocationChain.java:21) at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invoke(JavaFunctionBroker.java:133) at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invokeMethod(JavaFunctionBroker.java:124) at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:34) at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:10) at com.microsoft.azure.functions.worker.handler.MessageHandler.handle(MessageHandler.java:44) at com.microsoft.azure.functions.worker.JavaWorkerClient$StreamingMessagePeer.lambda$onNext$0(JavaWorkerClient.java:94) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-54 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) at reactor.core.publisher.Mono.block(Mono.java:1742) at com.azure.storage.blob.specialized.BlobInputStream.dispatchRead(BlobInputStream.java:79) at com.azure.storage.common.StorageInputStream.readInternal(StorageInputStream.java:346) at com.azure.storage.common.StorageInputStream.read(StorageInputStream.java:319) at com.azure.core.util.FluxUtil.lambda$toFluxByteBuffer$6(FluxUtil.java:329) at reactor.core.publisher.FluxGenerate$GenerateSubscription.slowPath(FluxGenerate.java:271) at reactor.core.publisher.FluxGenerate$GenerateSubscription.request(FluxGenerate.java:213) at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298) at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) at reactor.core.publisher.FluxIndex$IndexSubscriber.request(FluxIndex.java:137) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112) at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerComplete(FluxConcatMapNoPrefetch.java:274) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:887) at reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:480) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2160) at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.request(FluxSwitchOnFirst.java:704) at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:471) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.request(FluxMergeSequential.java:298) at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:430) at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.innerComplete(FluxMergeSequential.java:335) at reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.onComplete(FluxMergeSequential.java:591) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152) at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onComplete(FluxHide.java:147) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1840) at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1840) at reactor.core.publisher.FluxCallable.subscribe(FluxCallable.java:49) at reactor.core.publisher.Mono.subscribe(Mono.java:4490) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2666) at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260) at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) at reactor.core.publisher.MonoUsing$MonoUsingSubscriber.onNext(MonoUsing.java:232) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196) at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) at reactor.core.publisher.Mono.subscribe(Mono.java:4490) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:220) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275) at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128) at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:208) at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129) at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:439) at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:493) at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:776) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at com.azure.core.http.netty.implementation.AzureSdkHandler.channelRead(AzureSdkHandler.java:224) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1349) at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1389) at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ... 1 more Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:141) at reactor.core.publisher.Mono.block(Mono.java:1766) at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:149) at com.azure.storage.blob.BlobClient.uploadWithResponse(BlobClient.java:337)

To Reproduce Upload arbitrary blobs of size up to 64 GB

Code Snippet

BlobClient blobClient = getSomeTargetBlobClient(containerName, fileName, azureStorageEndpoint);
EncryptedBlobClient encryptedBlobClient = new EncryptedBlobClientBuilder(EncryptionVersion.V2).key(akek, KeyWrapAlgorithm.RSA1_5.toString())
                .blobClient(blobClient )
                .buildEncryptedBlobClient();

BlobClient sourceBlobClient = getSomeSourceBlobClient(containerName, fileName, azureStorageEndpoint);
        ParallelTransferOptions transferOptions = new ParallelTransferOptions() //
                .setMaxSingleUploadSizeLong(MAX_SINGLE_UPLOAD_SIZE) // 256MB
                .setBlockSizeLong(UPLOAD_BLOCK_SIZE) // 15
                .setMaxConcurrency(MAX_CONCURRENCY) // 1 
                .setProgressListener(progress -> log.info("Progress {}", progress));

        BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(FluxUtil.toFluxByteBuffer(sourceBlobClient.openInputStream()));
        uploadOptions.setParallelTransferOptions(transferOptions);

Response<BlockBlobItem> response = targetBlobClient.uploadWithResponse(uploadOptions, Duration.ofMinutes(DECRYPTION_TIMEOUT_IN_MINUTES), Context.NONE);

Expected behavior Stable upload without error.

Setup (please complete the following information):

Additional context Not sure if necessary. Our logging settings in host.json are }, "logging": { "logLevel": { "default": "Warning", "Function": "Error", "Host.Results": "Information", "Host.Aggregator": "Information" }, "applicationInsights": { "logLevel": { "default": "Warning" }, "samplingSettings": { "isEnabled": true, "excludedTypes": "Request;Exception" } }

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

github-actions[bot] commented 1 month ago

@ibrahimrabab @ibrandes @kyleknap @seanmcc-msft

github-actions[bot] commented 1 month ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

Stefan-R-Acc commented 1 month ago

Additional info: Seemed to work with v12.25.1 Error shows up since update to v12.25.3 and v12.27.1

Stefan-R-Acc commented 1 month ago

Another exception which occurs too, after updating azure azure-sdk-bom1.2.22 to 1.2.27 - here on downloadContentWithResponse

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-5 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) at reactor.core.publisher.Mono.block(Mono.java:1742) at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:146) at com.azure.storage.blob.specialized.BlobClientBase.downloadContentWithResponse(BlobClientBase.java:1066) at com.azure.storage.blob.specialized.BlobInputStream.dispatchRead(BlobInputStream.java:75) at com.azure.storage.common.StorageInputStream.readInternal(StorageInputStream.java:346)

alzimmermsft commented 1 month ago

Thanks for filing this @Stefan-R-Acc!

Would you mind trying a change on this line of code

BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(FluxUtil.toFluxByteBuffer(sourceBlobClient.openInputStream()));

to either

BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(sourceBlobClient.openInputStream());

or

BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(FluxUtil.toFluxByteBuffer(sourceBlobClient.openInputStream()).subscribeOn(Schedulers.boundedElastic()));

This issue is very similar to https://github.com/Azure/azure-sdk-for-java/issues/38991 and could be a case we didn't cover with using Flux.toFluxByteBuffer directly.

Stefan-R-Acc commented 1 month ago

Hi @alzimmermsft ,

we try with option 2 (.subscribeOn(Schedulers.boundedElastic())) It seems to bring improvement but we have to observe the system the next days.

The option 1 (new BlobParallelUploadOptions(sourceBlobClient.openInputStream())) is around 30% slower then option 2 in my local test with a large file against azurite.

alzimmermsft commented 1 month ago

Thank you for the update @Stefan-R-Acc

For the second options (using .subscribeOn(Schedulers.boundedElastic())) have the observations gone well, potentially resolving this issue?

And thank you for sharing the performance characteristics of the first options of just passing the InputStream from BlobClient.openInputStream(). We'll need to take a look into why there is such a performance drop off.

Stefan-R-Acc commented 1 month ago

Hi @alzimmermsft,

there is no new observation of the block()/blockfirst() error after the fix. We will let you know if the error occurs again.

alzimmermsft commented 1 month ago

For now I'm going to mark this issue as resolved

github-actions[bot] commented 1 month ago

Hi @Stefan-R-Acc. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation.