Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.36k stars 2k forks source link

[BUG] OkHttpAsyncHttpClient throws IOException downloading blob from Azure container #8183

Closed TimOMalley closed 4 years ago

TimOMalley commented 4 years ago

Describe the bug OkHttpAsyncHttpClient throws an IOException: closed when it's reading the InputStream during the BlobClient downloadToFile. Exception or Stack Trace 2020-02-11 15:54:00,928 | INFO | -pubblobsdk_Worker-2 | o.q.c.JobRunShell | 715 - org.quartz-scheduler.quartz - 2.3.0 | Job myGroup.us_gov_dod_af_cce_mm_pubblobsdk_route2 threw a JobExecutionException: org.quartz.JobExecutionException: reactor.core.Exceptions$ReactiveException: java.io.IOException: closed at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:61) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.quartz.core.JobRunShell.run(JobRunShell.java:202) [!/:?] at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [!/:?] Caused by: reactor.core.Exceptions$ReactiveException: java.io.IOException: closed at reactor.core.Exceptions.propagate(Exceptions.java:336) ~[!/:3.3.0.RELEASE] at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91) ~[!/:3.3.0.RELEASE] at reactor.core.publisher.Mono.block(Mono.java:1663) ~[!/:3.3.0.RELEASE] at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:94) ~[!/:?] at com.azure.storage.blob.specialized.BlobClientBase.downloadToFileWithResponse(BlobClientBase.java:481) ~[!/:?] at com.azure.storage.blob.specialized.BlobClientBase.downloadToFile(BlobClientBase.java:442) ~[!/:?] at mil.af.cce2.mm.templates.azure.lib.AzureLibBean.downloadBlobFromContainer(AzureLibBean.java:480) ~[!/:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212] at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:187) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.loadbalancer.QueueLoadBalancer.process(QueueLoadBalancer.java:44) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.loadbalancer.LoadBalancerSupport.process(LoadBalancerSupport.java:97) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:58) ~[!/:2.21.0.fuse-750033-redhat-00001] ... 2 more Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93) ~[!/:3.3.0.RELEASE] at reactor.core.publisher.Mono.block(Mono.java:1663) ~[!/:3.3.0.RELEASE] at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:94) ~[!/:?] at com.azure.storage.blob.specialized.BlobClientBase.downloadToFileWithResponse(BlobClientBase.java:481) ~[!/:?] at com.azure.storage.blob.specialized.BlobClientBase.downloadToFile(BlobClientBase.java:442) ~[!/:?] at mil.af.cce2.mm.templates.azure.lib.AzureLibBean.downloadBlobFromContainer(AzureLibBean.java:480) ~[!/:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212] at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:187) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.loadbalancer.QueueLoadBalancer.process(QueueLoadBalancer.java:44) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.processor.loadbalancer.LoadBalancerSupport.process(LoadBalancerSupport.java:97) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:58) ~[!/:2.21.0.fuse-750033-redhat-00001] at org.quartz.core.JobRunShell.run(JobRunShell.java:202) [!/:?] at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [!/:?] Caused by: java.io.IOException: closed at okio.RealBufferedSource$inputStream$1.read(RealBufferedSource.kt:434) ~[?:?] at java.io.InputStream.read(InputStream.java:101) ~[?:1.8.0_212] at com.azure.core.http.okhttp.OkHttpAsyncHttpClient$OkHttpResponse.lambda$toFluxByteBuffer$6(OkHttpAsyncHttpClient.java:293) ~[?:?] at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[?:?] at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:79) ~[?:?] at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:99) ~[?:?] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:1920) ~[?:?] at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) ~[?:?] at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.request(FluxTakeUntil.java:133) ~[?:?] at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:179) ~[?:?] at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) ~[?:?] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:105) ~[?:?] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:1920) ~[?:?] at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[?:?] at com.azure.core.util.FluxUtil$1$1.completed(FluxUtil.java:257) ~[?:?] at com.azure.core.util.FluxUtil$1$1.completed(FluxUtil.java:248) ~[?:?] at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) ~[?:1.8.0_212] at sun.nio.ch.SimpleAsynchronousFileChannelImpl$3.run(SimpleAsynchronousFileChannelImpl.java:389) ~[?:1.8.0_212] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_212] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_212] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_212] To Reproduce Upload file to blob container. Use azure-core-http-okhttp instead of azure-core-http-netty Http Client. BlobClient downloadToFile

Code Snippet BlobServiceClient blobServiceClient = new BlobServiceClientBuilder() .connectionString(getConnectionString()) .httpClient(new OkHttpAsyncHttpClientBuilder().build()) .buildClient(); BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(getContainerName()); for (BlobItem blobItem : containerClient.listBlobs()) { String fileName = blobItem.getName(); BlobClient blobClient2 = containerClient.getBlobClient(fileName); blobClient2.downloadToFile(fileName); blobClient2.delete(); }

Expected behavior File downloaded to directory Screenshots If applicable, add screenshots to help explain your problem.

Setup (please complete the following information):

Additional context Excluded azure-core-http-netty from azure-storage-common in the maven pom.

Information Checklist Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

rickle-msft commented 4 years ago

Thank you @TimOMalley for reporting this. @alzimmermsft can you please take a look at what's going on with the okHttpClient?

cbismuth commented 4 years ago

Hi, same issue here, can't download any blob to file.

Dependencies:

<azure.version>1.31.0</azure.version>
<azure-storage-blob.version>12.4.0</azure-storage-blob.version>
<azure-core-http-okhttp.version>1.1.0</azure-core-http-okhttp.version>
<azure-client-authentication.version>1.7.1</azure-client-authentication.version>

Logs:

16:17:36.071 [pool-4-thread-3] ERROR f.t.d.m.a.AuditFileDecoderService - java.io.IOException: closed
reactor.core.Exceptions$ReactiveException: java.io.IOException: closed
    at reactor.core.Exceptions.propagate(Exceptions.java:326)
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91)
    at reactor.core.publisher.Mono.block(Mono.java:1494)
    at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:99)
    at com.azure.storage.blob.specialized.BlobClientBase.downloadToFileWithResponse(BlobClientBase.java:563)
    at com.azure.storage.blob.specialized.BlobClientBase.downloadToFile(BlobClientBase.java:488)
    at fr.tf1.data.monitoring.azure.AuditFileDecoderService.getColumns(AuditFileDecoderService.java:41)
    at fr.tf1.data.monitoring.azure.AzureBlobItemsFinderService.lambda$null$1(AzureBlobItemsFinderService.java:106)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.stream.SliceOps$1$1.accept(SliceOps.java:204)
    at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1812)
    at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
    at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at fr.tf1.data.monitoring.azure.AzureBlobItemsFinderService.lambda$null$2(AzureBlobItemsFinderService.java:110)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
        ... 22 common frames omitted
Caused by: java.io.IOException: closed
    at okio.RealBufferedSource$inputStream$1.read(RealBufferedSource.kt:434)
    at java.io.InputStream.read(InputStream.java:101)
    at com.azure.core.http.okhttp.OkHttpAsyncHttpClient$OkHttpResponse.lambda$toFluxByteBuffer$6(OkHttpAsyncHttpClient.java:293)
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
    at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:78)
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:99)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:1842)
    at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155)
    at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.request(FluxTakeUntil.java:133)
    at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:179)
    at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:102)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:1842)
    at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138)
    at com.azure.core.util.FluxUtil$1$1.completed(FluxUtil.java:257)
    at com.azure.core.util.FluxUtil$1$1.completed(FluxUtil.java:248)
    at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126)
    at sun.nio.ch.SimpleAsynchronousFileChannelImpl$3.run(SimpleAsynchronousFileChannelImpl.java:389)
    ... 3 common frames omitted
joshfree commented 4 years ago

@anuchandy can you and @sima-zhu please investigate?

cbismuth commented 4 years ago

Thanks, here are our main Java / Spring configuration classes below.

package fr.tf1.data.monitoring.spring;

import com.microsoft.azure.AzureEnvironment;
import com.microsoft.azure.AzureResponseBuilder;
import com.microsoft.azure.credentials.ApplicationTokenCredentials;
import com.microsoft.azure.management.resources.Subscription;
import com.microsoft.azure.management.resources.Subscriptions;
import com.microsoft.azure.management.resources.fluentcore.utils.ProviderRegistrationInterceptor;
import com.microsoft.azure.management.resources.fluentcore.utils.ResourceManagerThrottlingInterceptor;
import com.microsoft.azure.management.resources.implementation.ResourceManager;
import com.microsoft.azure.management.resources.implementation.ResourceManager.Authenticated;
import com.microsoft.azure.management.storage.implementation.StorageManager;
import com.microsoft.azure.serializer.AzureJacksonAdapter;
import com.microsoft.rest.RestClient;
import com.microsoft.rest.retry.ExponentialBackoffRetryStrategy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

@Configuration
@PropertySource("classpath:application.properties")
public class AzureConfig {

    @Value("${monitoring.azure.clientId}")
    private String clientId;
    @Value("${monitoring.azure.tenantId}")
    private String tenantId;
    @Value("${monitoring.azure.secret}")
    private String secret;

    @Bean
    Subscription defaultSubscription(final ApplicationTokenCredentials credentials) {
        final Authenticated authenticated = ResourceManager.authenticate(credentials);

        final Subscriptions subscriptions = authenticated.subscriptions();

        checkSubscriptions(subscriptions);

        return subscriptions.list().get(0);
    }

    private void checkSubscriptions(final Subscriptions subscriptions) {
        final int size = subscriptions.list().size();

        if (size != 1) {
            throw new IllegalStateException(String.format("Expected only one Azure subscription ([%d] found)", size));
        }
    }

    @Bean
    StorageManager storageManager(final RestClient restClient, final Subscription defaultSubscription) {
        return StorageManager.authenticate(restClient, defaultSubscription.subscriptionId());
    }

    @Bean
    RestClient restClient(final ApplicationTokenCredentials credentials) {
        return new RestClient.Builder().withBaseUrl(credentials.environment(), AzureEnvironment.Endpoint.RESOURCE_MANAGER)
                                       .withCredentials(credentials)
                                       .withSerializerAdapter(new AzureJacksonAdapter())
                                       .withResponseBuilderFactory(new AzureResponseBuilder.Factory())
                                       .withInterceptor(new ProviderRegistrationInterceptor(credentials))
                                       .withInterceptor(new ResourceManagerThrottlingInterceptor())
                                       .withRetryStrategy(new ExponentialBackoffRetryStrategy())
                                       .build();
    }

    @Bean
    ApplicationTokenCredentials applicationTokenCredentials() {
        return new ApplicationTokenCredentials(clientId, tenantId, secret, AzureEnvironment.AZURE);
    }
}
package fr.tf1.data.monitoring.azure;

import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobClientBuilder;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobContainerItem;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.microsoft.azure.management.resources.ResourceGroup;
import com.microsoft.azure.management.resources.Subscription;
import com.microsoft.azure.management.resources.implementation.ResourceManager;
import com.microsoft.azure.management.storage.StorageAccount;
import com.microsoft.azure.management.storage.StorageAccountKey;
import com.microsoft.azure.management.storage.implementation.StorageManager;
import com.microsoft.rest.RestClient;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
class AzureAuditConnector {

    private final RestClient restClient;
    private final Subscription defaultSubscription;
    private final StorageManager storageManager;

    AzureAuditConnector(final RestClient restClient,
                        final Subscription defaultSubscription,
                        final StorageManager storageManager) {
        this.restClient = restClient;
        this.defaultSubscription = defaultSubscription;
        this.storageManager = storageManager;
    }

    List<ResourceGroup> getResourceGroups() {
        return ResourceManager.authenticate(restClient)
                              .withSubscription(defaultSubscription.subscriptionId())
                              .resourceGroups()
                              .list();
    }

    List<StorageAccount> getStorageAccounts() {
        return storageManager.storageAccounts().list();
    }

    BlobServiceClient createBlobServiceClient(final StorageAccount storageAccount) {
        final StorageAccountKey storageAccountKey = storageAccount.getKeys().get(0);
        final StorageSharedKeyCredential storageSharedKeyCredential = new StorageSharedKeyCredential(storageAccount.name(), storageAccountKey.value());

        return new BlobServiceClientBuilder().credential(storageSharedKeyCredential)
                                             .endpoint(storageAccount.endPoints().primary().blob())
                                             .buildClient();
    }

    BlobContainerClient createBlobContainerClient(final StorageAccount storageAccount, final BlobContainerItem blobContainerItem) {
        final StorageAccountKey storageAccountKey = storageAccount.getKeys().get(0);
        final StorageSharedKeyCredential storageSharedKeyCredential = new StorageSharedKeyCredential(storageAccount.name(), storageAccountKey.value());

        return new BlobContainerClientBuilder().endpoint(storageAccount.endPoints().primary().blob())
                                               .credential(storageSharedKeyCredential)
                                               .containerName(blobContainerItem.getName())
                                               .buildClient();
    }

    BlobClient createBlobClient(final StorageAccount storageAccount, final BlobContainerItem blobContainerItem, final String blobName) {
        final StorageAccountKey storageAccountKey = storageAccount.getKeys().get(0);
        final StorageSharedKeyCredential storageSharedKeyCredential = new StorageSharedKeyCredential(storageAccount.name(), storageAccountKey.value());

        return new BlobClientBuilder().addPolicy(ContentTypeRemovalHttpPipelinePolicy.INSTANCE)
                                      .endpoint(storageAccount.endPoints().primary().blob())
                                      .credential(storageSharedKeyCredential)
                                      .containerName(blobContainerItem.getName())
                                      .blobName(blobName)
                                      .buildClient();
    }
}
package fr.tf1.data.monitoring.azure;

import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import reactor.core.publisher.Mono;

/**
 * https://github.com/Azure/azure-sdk-for-java/issues/7234#issuecomment-577905820
 */
final class ContentTypeRemovalHttpPipelinePolicy implements HttpPipelinePolicy {

    public static final ContentTypeRemovalHttpPipelinePolicy INSTANCE = new ContentTypeRemovalHttpPipelinePolicy();

    private ContentTypeRemovalHttpPipelinePolicy() {
        // NOP
    }

    @Override
    public Mono<HttpResponse> process(final HttpPipelineCallContext callContext, final HttpPipelineNextPolicy nextPolicy) {
        final HttpHeaders headers = callContext.getHttpRequest().getHeaders();
        final String contentType = headers.getValue("Content-Type");

        if (contentType == null) {
            callContext.getHttpRequest().setHeader("Content-Type", "");
        }

        return nextPolicy.process();
    }
}
anuchandy commented 4 years ago

@cbismuth, @TimOMalley we're looking into this.

@cbismuth - thank you for sharing the code. If possible could you update the sample with spring controller code that uses the download API?

cbismuth commented 4 years ago

Sure, this code is designed to download files and decode schema, here it is:

package fr.tf1.data.monitoring.azure;

import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.models.BlobContainerItem;
import com.azure.storage.blob.models.BlobItem;
import com.google.common.collect.Sets;
import com.microsoft.azure.management.storage.StorageAccount;
import org.apache.commons.io.FilenameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Service
class AuditFileDecoderService {

    private static final Logger LOGGER = LoggerFactory.getLogger(AuditFileDecoderService.class);

    private final AzureAuditConnector azureAuditConnector;
    private final List<AuditFileDecoder> auditFileDecoders;

    AuditFileDecoderService(final AzureAuditConnector azureAuditConnector, final List<AuditFileDecoder> auditFileDecoders) {
        this.azureAuditConnector = azureAuditConnector;
        this.auditFileDecoders = auditFileDecoders;
    }

Set<String> getColumns(final StorageAccount storageAccount, final BlobContainerItem blobContainerItem, final BlobItem blobItem) {
    try {
        final BlobClient blobClient = azureAuditConnector.createBlobClient(storageAccount, blobContainerItem, blobItem.getName());

        final String absolutePath = createTempFile(blobItem);

        blobClient.downloadToFile(absolutePath, true);

        return auditFileDecoders.stream()
                                .map(decoder -> decoder.getColumns(absolutePath))
                                .flatMap(Collection::stream)
                                .collect(Collectors.toSet());
    } catch (final Exception e) {
        LOGGER.error(e.getMessage(), e);

        return Sets.newHashSet();
    }
}

    private String createTempFile(final BlobItem blobItem) {
        try {
            final Path tempFile = Files.createTempFile("" + System.currentTimeMillis(), FilenameUtils.getName(blobItem.getName()));

            tempFile.toFile().deleteOnExit();

            return tempFile.toString();
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }
    }
}
cbismuth commented 4 years ago

I'm an Azure customer at TF1. If you want we can get into a Teams meeting.

cbismuth commented 4 years ago

To give you a better overview of our implementation, here are all our dependencies:

[INFO] fr.tf1.data:azure-insights-probes:jar:1.0.0-SNAPSHOT
[INFO] +- org.springframework.retry:spring-retry:jar:1.2.5.RELEASE:compile
[INFO] +- org.springframework:spring-jdbc:jar:5.1.8.RELEASE:compile
[INFO] |  +- org.springframework:spring-beans:jar:5.1.8.RELEASE:compile
[INFO] |  +- org.springframework:spring-core:jar:5.1.8.RELEASE:compile
[INFO] |  |  \- org.springframework:spring-jcl:jar:5.1.8.RELEASE:compile
[INFO] |  \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile
[INFO] +- com.zaxxer:HikariCP:jar:3.4.1:compile
[INFO] +- mysql:mysql-connector-java:jar:5.1.48:runtime
[INFO] +- net.snowflake:snowflake-jdbc:jar:3.11.1:compile
[INFO] +- com.microsoft.azure:azure-mgmt-storage:jar:1.31.0:compile
[INFO] |  +- com.microsoft.azure:azure-client-runtime:jar:1.7.0:compile
[INFO] |  |  \- com.microsoft.rest:client-runtime:jar:1.7.0:compile
[INFO] |  |     +- com.squareup.retrofit2:retrofit:jar:2.5.0:compile
[INFO] |  |     +- com.squareup.okhttp3:logging-interceptor:jar:3.12.2:compile
[INFO] |  |     +- com.squareup.okhttp3:okhttp-urlconnection:jar:3.12.2:compile
[INFO] |  |     +- com.squareup.retrofit2:converter-jackson:jar:2.5.0:compile
[INFO] |  |     +- com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.10.0:compile
[INFO] |  |     \- com.squareup.retrofit2:adapter-rxjava:jar:2.6.2:compile
[INFO] |  \- com.microsoft.azure:azure-mgmt-resources:jar:1.31.0:compile
[INFO] |     +- io.reactivex:rxjava:jar:1.3.8:compile
[INFO] |     \- org.apache.httpcomponents:httpcore:jar:4.4.5:compile
[INFO] +- com.azure:azure-storage-blob:jar:12.4.0:compile
[INFO] |  +- com.azure:azure-core:jar:1.2.0:compile
[INFO] |  |  +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.10.0:compile
[INFO] |  |  +- com.fasterxml.jackson.dataformat:jackson-dataformat-xml:jar:2.10.0:compile
[INFO] |  |  |  +- com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.10.0:compile
[INFO] |  |  |  |  +- jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2:compile
[INFO] |  |  |  |  \- jakarta.activation:jakarta.activation-api:jar:1.2.1:compile
[INFO] |  |  |  +- org.codehaus.woodstox:stax2-api:jar:4.2:compile
[INFO] |  |  |  \- com.fasterxml.woodstox:woodstox-core:jar:6.0.1:compile
[INFO] |  |  \- io.netty:netty-tcnative-boringssl-static:jar:2.0.26.Final:compile
[INFO] |  \- com.azure:azure-storage-common:jar:12.4.0:compile
[INFO] +- com.azure:azure-core-http-okhttp:jar:1.1.0:compile
[INFO] |  \- com.squareup.okhttp3:okhttp:jar:4.2.2:compile
[INFO] |     +- com.squareup.okio:okio:jar:2.2.2:compile
[INFO] |     \- org.jetbrains.kotlin:kotlin-stdlib:jar:1.3.50:compile
[INFO] |        +- org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.3.50:compile
[INFO] |        \- org.jetbrains:annotations:jar:13.0:compile
[INFO] +- com.microsoft.azure:azure-client-authentication:jar:1.7.1:compile
[INFO] |  +- commons-codec:commons-codec:jar:1.11:compile
[INFO] |  +- com.microsoft.azure:adal4j:jar:1.6.4:compile
[INFO] |  |  \- com.nimbusds:oauth2-oidc-sdk:jar:6.5:compile
[INFO] |  |     +- com.sun.mail:javax.mail:jar:1.6.1:compile
[INFO] |  |     |  \- javax.activation:activation:jar:1.1:compile
[INFO] |  |     +- com.github.stephenc.jcip:jcip-annotations:jar:1.0-1:compile
[INFO] |  |     +- net.minidev:json-smart:jar:2.3:compile (version selected from constraint [1.3.1,2.3])
[INFO] |  |     |  \- net.minidev:accessors-smart:jar:1.2:compile
[INFO] |  |     |     \- org.ow2.asm:asm:jar:5.0.4:compile
[INFO] |  |     +- com.nimbusds:lang-tag:jar:1.4.4:compile (version selected from constraint [1.4.3,))
[INFO] |  |     \- com.nimbusds:nimbus-jose-jwt:jar:8.8:compile (version selected from constraint [6.0.1,))
[INFO] |  \- com.microsoft.azure:azure-annotations:jar:1.10.0:compile
[INFO] +- com.github.ben-manes.caffeine:caffeine:jar:2.8.1:compile
[INFO] |  +- org.checkerframework:checker-qual:jar:3.1.0:compile
[INFO] |  \- com.google.errorprone:error_prone_annotations:jar:2.3.4:compile
[INFO] +- com.opencsv:opencsv:jar:5.1:compile
[INFO] |  +- org.apache.commons:commons-text:jar:1.8:compile
[INFO] |  +- commons-beanutils:commons-beanutils:jar:1.9.4:compile
[INFO] |  |  +- commons-logging:commons-logging:jar:1.2:compile
[INFO] |  |  \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] |  \- org.apache.commons:commons-collections4:jar:4.4:compile
[INFO] +- org.apache.avro:avro:jar:1.9.2:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.10.2:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-annotations:jar:2.10.2:compile
[INFO] |  \- org.apache.commons:commons-compress:jar:1.19:compile
[INFO] +- org.zeroturnaround:zt-exec:jar:1.11:compile
[INFO] +- com.microsoft.azure.functions:azure-functions-java-library:jar:1.3.0:compile
[INFO] +- com.microsoft.azure:azure-eventhubs:jar:2.3.2:compile
[INFO] |  +- org.apache.qpid:proton-j:jar:0.31.0:compile
[INFO] |  \- com.microsoft.azure:qpid-proton-j-extensions:jar:1.2.0:compile
[INFO] +- com.microsoft.azure:applicationinsights-web:jar:2.3.1:compile
[INFO] |  \- com.microsoft.azure:applicationinsights-core:jar:2.3.1:compile
[INFO] +- org.springframework:spring-context:jar:5.1.8.RELEASE:compile
[INFO] |  +- org.springframework:spring-aop:jar:5.1.8.RELEASE:compile
[INFO] |  \- org.springframework:spring-expression:jar:5.1.8.RELEASE:compile
[INFO] +- io.micrometer:micrometer-registry-azure-monitor:jar:1.1.4:compile
[INFO] |  \- io.micrometer:micrometer-core:jar:1.1.4:compile
[INFO] |     +- org.hdrhistogram:HdrHistogram:jar:2.1.9:compile
[INFO] |     \- org.latencyutils:LatencyUtils:jar:2.0.3:compile
[INFO] +- org.springframework:spring-test:jar:5.1.8.RELEASE:test
[INFO] +- io.lettuce:lettuce-core:jar:5.1.7.RELEASE:compile
[INFO] |  +- io.netty:netty-common:jar:4.1.36.Final:compile
[INFO] |  +- io.netty:netty-handler:jar:4.1.36.Final:compile
[INFO] |  |  +- io.netty:netty-buffer:jar:4.1.36.Final:compile
[INFO] |  |  \- io.netty:netty-codec:jar:4.1.36.Final:compile
[INFO] |  +- io.netty:netty-transport:jar:4.1.36.Final:compile
[INFO] |  |  \- io.netty:netty-resolver:jar:4.1.36.Final:compile
[INFO] |  \- io.projectreactor:reactor-core:jar:3.2.8.RELEASE:compile
[INFO] |     \- org.reactivestreams:reactive-streams:jar:1.0.2:compile
[INFO] +- org.apache.commons:commons-pool2:jar:2.6.2:compile
[INFO] +- com.google.guava:guava:jar:28.0-jre:compile
[INFO] |  +- com.google.guava:failureaccess:jar:1.0.1:compile
[INFO] |  +- com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:3.0.2:compile
[INFO] |  +- com.google.j2objc:j2objc-annotations:jar:1.3:compile
[INFO] |  \- org.codehaus.mojo:animal-sniffer-annotations:jar:1.17:compile
[INFO] +- com.google.code.gson:gson:jar:2.8.5:compile
[INFO] +- com.github.everit-org.json-schema:org.everit.json.schema:jar:1.11.1:compile
[INFO] |  +- org.json:json:jar:20180130:compile
[INFO] |  +- commons-validator:commons-validator:jar:1.6:compile
[INFO] |  |  \- commons-digester:commons-digester:jar:1.8.1:compile
[INFO] |  +- com.damnhandy:handy-uri-templates:jar:2.1.6:compile
[INFO] |  |  \- joda-time:joda-time:jar:2.9.4:compile
[INFO] |  \- com.google.re2j:re2j:jar:1.1:compile
[INFO] +- commons-io:commons-io:jar:2.6:compile
[INFO] +- org.apache.commons:commons-lang3:jar:3.9:compile
[INFO] +- org.slf4j:slf4j-api:jar:1.7.26:compile
[INFO] +- ch.qos.logback:logback-classic:jar:1.2.3:runtime
[INFO] |  \- ch.qos.logback:logback-core:jar:1.2.3:runtime
[INFO] +- junit:junit:jar:4.12:test
[INFO] |  \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] +- org.assertj:assertj-core:jar:3.12.2:test
[INFO] \- org.mockito:mockito-core:jar:2.28.2:test
[INFO]    +- net.bytebuddy:byte-buddy:jar:1.9.10:test
[INFO]    +- net.bytebuddy:byte-buddy-agent:jar:1.9.10:test
[INFO]    \- org.objenesis:objenesis:jar:2.6:test
anuchandy commented 4 years ago

@cbismuth thank you!, that helps.

I guess issue is with Mono.using operator in reactor-core 3.2.8.RELEASE, which we use to close the stream. I'm trying to pin point where exactly this is happening in this version.

Mean time - is it possible to override the reactor-core version in your pom file to 3.3.0.RELEASE and see that fixes the early close?

    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>3.3.0.RELEASE</version>
    </dependency>
cbismuth commented 4 years ago

No, it doesn't work, same exception with this dependency added as below:

[INFO] fr.tf1.data:azure-insights-probes:jar:1.0.0-SNAPSHOT
[INFO] +- io.projectreactor:reactor-core:jar:3.3.0.RELEASE:compile
[INFO] |  \- org.reactivestreams:reactive-streams:jar:1.0.3:compile
[INFO] +- org.springframework.retry:spring-retry:jar:1.2.5.RELEASE:compile
[INFO] +- org.springframework:spring-jdbc:jar:5.1.8.RELEASE:compile
[INFO] |  +- org.springframework:spring-beans:jar:5.1.8.RELEASE:compile
[INFO] |  +- org.springframework:spring-core:jar:5.1.8.RELEASE:compile
[INFO] |  |  \- org.springframework:spring-jcl:jar:5.1.8.RELEASE:compile
[INFO] |  \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile
[INFO] +- com.zaxxer:HikariCP:jar:3.4.1:compile
[INFO] +- mysql:mysql-connector-java:jar:5.1.48:runtime
[INFO] +- net.snowflake:snowflake-jdbc:jar:3.11.1:compile
[INFO] +- com.microsoft.azure:azure-mgmt-storage:jar:1.31.0:compile
[INFO] |  +- com.microsoft.azure:azure-client-runtime:jar:1.7.0:compile
[INFO] |  |  \- com.microsoft.rest:client-runtime:jar:1.7.0:compile
[INFO] |  |     +- com.squareup.retrofit2:retrofit:jar:2.5.0:compile
[INFO] |  |     +- com.squareup.okhttp3:logging-interceptor:jar:3.12.2:compile
[INFO] |  |     +- com.squareup.okhttp3:okhttp-urlconnection:jar:3.12.2:compile
[INFO] |  |     +- com.squareup.retrofit2:converter-jackson:jar:2.5.0:compile
[INFO] |  |     +- com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.10.0:compile
[INFO] |  |     \- com.squareup.retrofit2:adapter-rxjava:jar:2.6.2:compile
[INFO] |  \- com.microsoft.azure:azure-mgmt-resources:jar:1.31.0:compile
[INFO] |     +- io.reactivex:rxjava:jar:1.3.8:compile
[INFO] |     \- org.apache.httpcomponents:httpcore:jar:4.4.5:compile
[INFO] +- com.azure:azure-storage-blob:jar:12.4.0:compile
[INFO] |  +- com.azure:azure-core:jar:1.2.0:compile
[INFO] |  |  +- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.10.0:compile
[INFO] |  |  +- com.fasterxml.jackson.dataformat:jackson-dataformat-xml:jar:2.10.0:compile
[INFO] |  |  |  +- com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.10.0:compile
[INFO] |  |  |  |  +- jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.2:compile
[INFO] |  |  |  |  \- jakarta.activation:jakarta.activation-api:jar:1.2.1:compile
[INFO] |  |  |  +- org.codehaus.woodstox:stax2-api:jar:4.2:compile
[INFO] |  |  |  \- com.fasterxml.woodstox:woodstox-core:jar:6.0.1:compile
[INFO] |  |  \- io.netty:netty-tcnative-boringssl-static:jar:2.0.26.Final:compile
[INFO] |  \- com.azure:azure-storage-common:jar:12.4.0:compile
[INFO] +- com.azure:azure-core-http-okhttp:jar:1.1.0:compile
[INFO] |  \- com.squareup.okhttp3:okhttp:jar:4.2.2:compile
[INFO] |     +- com.squareup.okio:okio:jar:2.2.2:compile
[INFO] |     \- org.jetbrains.kotlin:kotlin-stdlib:jar:1.3.50:compile
[INFO] |        +- org.jetbrains.kotlin:kotlin-stdlib-common:jar:1.3.50:compile
[INFO] |        \- org.jetbrains:annotations:jar:13.0:compile
[INFO] +- com.microsoft.azure:azure-client-authentication:jar:1.7.1:compile
[INFO] |  +- commons-codec:commons-codec:jar:1.11:compile
[INFO] |  +- com.microsoft.azure:adal4j:jar:1.6.4:compile
[INFO] |  |  \- com.nimbusds:oauth2-oidc-sdk:jar:6.5:compile
[INFO] |  |     +- com.sun.mail:javax.mail:jar:1.6.1:compile
[INFO] |  |     |  \- javax.activation:activation:jar:1.1:compile
[INFO] |  |     +- com.github.stephenc.jcip:jcip-annotations:jar:1.0-1:compile
[INFO] |  |     +- net.minidev:json-smart:jar:2.3:compile (version selected from constraint [1.3.1,2.3])
[INFO] |  |     |  \- net.minidev:accessors-smart:jar:1.2:compile
[INFO] |  |     |     \- org.ow2.asm:asm:jar:5.0.4:compile
[INFO] |  |     +- com.nimbusds:lang-tag:jar:1.4.4:compile (version selected from constraint [1.4.3,))
[INFO] |  |     \- com.nimbusds:nimbus-jose-jwt:jar:8.8:compile (version selected from constraint [6.0.1,))
[INFO] |  \- com.microsoft.azure:azure-annotations:jar:1.10.0:compile
[INFO] +- com.github.ben-manes.caffeine:caffeine:jar:2.8.1:compile
[INFO] |  +- org.checkerframework:checker-qual:jar:3.1.0:compile
[INFO] |  \- com.google.errorprone:error_prone_annotations:jar:2.3.4:compile
[INFO] +- com.opencsv:opencsv:jar:5.1:compile
[INFO] |  +- org.apache.commons:commons-text:jar:1.8:compile
[INFO] |  +- commons-beanutils:commons-beanutils:jar:1.9.4:compile
[INFO] |  |  +- commons-logging:commons-logging:jar:1.2:compile
[INFO] |  |  \- commons-collections:commons-collections:jar:3.2.2:compile
[INFO] |  \- org.apache.commons:commons-collections4:jar:4.4:compile
[INFO] +- org.apache.avro:avro:jar:1.9.2:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-core:jar:2.10.2:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.10.2:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-annotations:jar:2.10.2:compile
[INFO] |  \- org.apache.commons:commons-compress:jar:1.19:compile
[INFO] +- org.zeroturnaround:zt-exec:jar:1.11:compile
[INFO] +- com.microsoft.azure.functions:azure-functions-java-library:jar:1.3.0:compile
[INFO] +- com.microsoft.azure:azure-eventhubs:jar:2.3.2:compile
[INFO] |  +- org.apache.qpid:proton-j:jar:0.31.0:compile
[INFO] |  \- com.microsoft.azure:qpid-proton-j-extensions:jar:1.2.0:compile
[INFO] +- com.microsoft.azure:applicationinsights-web:jar:2.3.1:compile
[INFO] |  \- com.microsoft.azure:applicationinsights-core:jar:2.3.1:compile
[INFO] +- org.springframework:spring-context:jar:5.1.8.RELEASE:compile
[INFO] |  +- org.springframework:spring-aop:jar:5.1.8.RELEASE:compile
[INFO] |  \- org.springframework:spring-expression:jar:5.1.8.RELEASE:compile
[INFO] +- io.micrometer:micrometer-registry-azure-monitor:jar:1.1.4:compile
[INFO] |  \- io.micrometer:micrometer-core:jar:1.1.4:compile
[INFO] |     +- org.hdrhistogram:HdrHistogram:jar:2.1.9:compile
[INFO] |     \- org.latencyutils:LatencyUtils:jar:2.0.3:compile
[INFO] +- org.springframework:spring-test:jar:5.1.8.RELEASE:test
[INFO] +- io.lettuce:lettuce-core:jar:5.1.7.RELEASE:compile
[INFO] |  +- io.netty:netty-common:jar:4.1.36.Final:compile
[INFO] |  +- io.netty:netty-handler:jar:4.1.36.Final:compile
[INFO] |  |  +- io.netty:netty-buffer:jar:4.1.36.Final:compile
[INFO] |  |  \- io.netty:netty-codec:jar:4.1.36.Final:compile
[INFO] |  \- io.netty:netty-transport:jar:4.1.36.Final:compile
[INFO] |     \- io.netty:netty-resolver:jar:4.1.36.Final:compile
[INFO] +- org.apache.commons:commons-pool2:jar:2.6.2:compile
[INFO] +- com.google.guava:guava:jar:28.0-jre:compile
[INFO] |  +- com.google.guava:failureaccess:jar:1.0.1:compile
[INFO] |  +- com.google.guava:listenablefuture:jar:9999.0-empty-to-avoid-conflict-with-guava:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:3.0.2:compile
[INFO] |  +- com.google.j2objc:j2objc-annotations:jar:1.3:compile
[INFO] |  \- org.codehaus.mojo:animal-sniffer-annotations:jar:1.17:compile
[INFO] +- com.google.code.gson:gson:jar:2.8.5:compile
[INFO] +- com.github.everit-org.json-schema:org.everit.json.schema:jar:1.11.1:compile
[INFO] |  +- org.json:json:jar:20180130:compile
[INFO] |  +- commons-validator:commons-validator:jar:1.6:compile
[INFO] |  |  \- commons-digester:commons-digester:jar:1.8.1:compile
[INFO] |  +- com.damnhandy:handy-uri-templates:jar:2.1.6:compile
[INFO] |  |  \- joda-time:joda-time:jar:2.9.4:compile
[INFO] |  \- com.google.re2j:re2j:jar:1.1:compile
[INFO] +- commons-io:commons-io:jar:2.6:compile
[INFO] +- org.apache.commons:commons-lang3:jar:3.9:compile
[INFO] +- org.slf4j:slf4j-api:jar:1.7.26:compile
[INFO] +- ch.qos.logback:logback-classic:jar:1.2.3:runtime
[INFO] |  \- ch.qos.logback:logback-core:jar:1.2.3:runtime
[INFO] +- junit:junit:jar:4.12:test
[INFO] |  \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] +- org.assertj:assertj-core:jar:3.12.2:test
[INFO] \- org.mockito:mockito-core:jar:2.28.2:test
[INFO]    +- net.bytebuddy:byte-buddy:jar:1.9.10:test
[INFO]    +- net.bytebuddy:byte-buddy-agent:jar:1.9.10:test
[INFO]    \- org.objenesis:objenesis:jar:2.6:test
anuchandy commented 4 years ago

Interesting, locally, reactor-core 3.2.8 broke my console app when downloading, that's why I suggested overriding the version.

Let me do some more investigations and get back.

anuchandy commented 4 years ago

any chance you can share a minimal spring project that repro this which I can run locally? my experience in spring space is low, so learning and creating service app myself seems taking good amount of time.

cbismuth commented 4 years ago

Sure, can you send me an email to my public address I'll send you an archive file with project bundled.

cbismuth commented 4 years ago

No worries, I almost removed all business logic, I'll create a public repo on GitHub in minutes.

anuchandy commented 4 years ago

ah I send an email :) no issue. thanks.

cbismuth commented 4 years ago

The repository is here.

You'll have to fill in the application.properties file with some properties of yours.

There're lot of Maven dependencies, I've kept them all and also copy/paste those from parent POM descriptors.

Just run main the main file and it will fail quickly.

Thanks a lot @anuchandy :+1:

anuchandy commented 4 years ago

@cbismuth perfect. I was able to reproduce this stream destruction using the project. Tomorrow I will debug this to root cause.

anuchandy commented 4 years ago

@cbismuth just want to update: Identified that, the issue is with early closing of response stream. This was due to Mono.using() operator, which even with no-eager will not wait for stream derived from it via flatmapMany to complete. The issue is fixed by using Flux.using. PR opened.

Thanks again for the repro code and helping us to root cause it.

I will update this issue with progress/release.

cbismuth commented 4 years ago

That sounds great, thanks @anuchandy :+1:

alzimmermsft commented 4 years ago

This issue should be resolved in the latest release of Azure Core OkHttp, if there are still on going issues please feel free to re-open the issue with additional content.