aws / aws-sdk-net

The official AWS SDK for .NET. For more information on the AWS SDK for .NET, see our web site:
http://aws.amazon.com/sdkfornet/
Apache License 2.0
2.04k stars 853 forks source link

AmazonS3Client.PutObjectAsync does not retry with HttpClient HttpCompletionOption.ResponseHeadersRead when using devproxy #3328

Open TomKirbz opened 3 months ago

TomKirbz commented 3 months ago

Describe the bug

I'm currently working on a project that exposes the AmazonS3Config retry properties to make them configurable to the client.

To test AWS's retry functionality I've been using a tool called devproxy which "is a command line tool for simulating APIs for testing apps."

With the following code I am able to successfully test this functionality and simulate the AWS API returning 503 and the API will happily retry with throttling and upload my file after a few simulated failed attempts.

AmazonS3Config amazonS3Config = new();
amazonS3Config.Timeout = TimeSpan.FromSeconds(10);
amazonS3Config.RetryMode = RequestRetryMode.Standard;
amazonS3Config.MaxErrorRetry = 10;
amazonS3Config.ThrottleRetries = true;

amazonS3Client amazonS3Client = new(
    myKey
    mySecret,
    myToken,
    amazonS3Config);

Func<Task<Uri>> source = GetDownloadFileUrl();

HttpClient client = _httpClientFactory.CreateClient();
HttpResponseMessage? sourceResponse = await client.GetAsync(await source(), cancellationToken);
using(Stream stream = await sourceResponse.Content.ReadAsStreamAsync(cancellationToken))
{
    PutObjectRequest putRequest = new PutObjectRequest
    {
        BucketName = _bucketName,
        Key = blobName,
        InputStream = stream,
    };

    putRequest.Headers.ContentLength = sourceResponse.Content.Headers.ContentLength ?? -1;
    await _amazonS3Client.PutObjectAsync(putRequest, cancellationToken);
}

However, for memory concerns regarding large files from our target location into the S3 bucket, I have added the following enum to the HttpClient, so the response content is correctly streamed via IAmazonS3.PutObjectAsync and not in memory:

HttpResponseMessage? sourceResponse = await client.GetAsync(await source(), HttpCompletionOption.ResponseHeadersRead, cancellationToken);

With this addition the API no longer retries, if a 503 is returned, a AmazonS3Exception is immediately thrown and does not retry. This could be an issue with the devproxy tool I am using however if I remove the enum above, the app will continue to retry as expected.

Expected Behavior

The API should retry as expected.

Current Behavior

AmazonS3Exception is immediately thown if a 503 is returned and does not retry.

Reproduction Steps

Attempt to upload a file with the following stream and the config above, and simulate a 503 response:

HttpClient client = _httpClientFactory.CreateClient();
HttpResponseMessage? sourceResponse = await client.GetAsync(<yourUri>, HttpCompletionOption.ResponseHeadersRead, <yourCancellationToken>);
Stream stream = await sourceResponse.Content.ReadAsStreamAsync(<yourCancellationToken>);

Possible Solution

No response

Additional Information/Context

I don't know if these will be of any use but these are the logs from devproxy.

Log1 you can see the api attempts to upload my file. A 503 is returned and my app throws a AmazonS3Exception and continues making a request to localhost to fail the command. This is with the HttpClient having the HttpCompletionOption.ResponseHeadersRead setting: log1

Log 2 You can see it attempts to do the upload several times before passing the response and my app continues to localhost to pass the command. This is without the HttpCompletionOption.ResponseHeadersRead setting: log2

Full exception: Amazon.S3.AmazonS3Exception: Error making request with Error Code ServiceUnavailable and Http Status Code ServiceUnavailable. No further error information was returned by the service. Response Body: { "message": "Slow Down", "details": "The server is currently unable to handle the request due to a temporary overload or maintenance. Please try again later." } ---> Amazon.Runtime.Internal.HttpErrorResponseException: Exception of type 'Amazon.Runtime.Internal.HttpErrorResponseException' was thrown. at Amazon.Runtime.HttpWebRequestMessage.ProcessHttpResponseMessage(HttpResponseMessage responseMessage) at Amazon.Runtime.HttpWebRequestMessage.GetResponseAsync(CancellationToken cancellationToken) at Amazon.Runtime.Internal.HttpHandler1.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.RedirectHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.Unmarshaller.InvokeAsync[T](IExecutionContext executionContext) at Amazon.S3.Internal.AmazonS3ResponseHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.ErrorHandler.InvokeAsync[T](IExecutionContext executionContext) --- End of inner exception stack trace --- at Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException exception, Stream responseStream) at Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionAsync(IExecutionContext executionContext, HttpErrorResponseException exception) at Amazon.Runtime.Internal.ExceptionHandler1.HandleAsync(IExecutionContext executionContext, Exception exception) at Amazon.Runtime.Internal.ErrorHandler.ProcessExceptionAsync(IExecutionContext executionContext, Exception exception) at Amazon.Runtime.Internal.ErrorHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.Signer.InvokeAsync[T](IExecutionContext executionContext) at Amazon.S3.Internal.S3Express.S3ExpressPreSigner.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.CredentialsRetriever.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.S3.Internal.AmazonS3ExceptionHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.ErrorCallbackHandler.InvokeAsync[T](IExecutionContext executionContext) at Amazon.Runtime.Internal.MetricsHandler.InvokeAsync[T](IExecutionContext executionContext) at AwsStorageService.UploadFileAsync(String blobName, Nullable1 fileSize, Func1 source, CancellationToken cancellationToken)

AWS .NET SDK and/or Package version used

AWSSDK.S3 3.7.308.7

Targeted .NET Platform

.NET 8

Operating System and version

Windows 10

ashishdhingra commented 3 months ago

@TomKirbz Good morning. I'm unsure what HttpCompletionOption.ResponseHeadersRead behavior devproxy might be causing.

Thanks, Ashish

TomKirbz commented 3 months ago

Hi Ashish, Thank you for your response.

Sadly the article and code you kindly supplied didn't make alot of difference, and you're right, this could be behaviour from the 3rd party tool we are using. I am currently in the process of setting up tests to target an AWS Service using the code I pasted above.

Do you have any advice on how we might be able to throttle our AWS service for it to return 503 and attempt to retry the upload? My plan is to upload thousands of files in quick succession, but any further guidance would be greatly appreciated.

Thanks Tom

TomKirbz commented 3 months ago

Hi I've done some testing using a AWS service and I believe that retry does not work with this current Stream:

HttpClient client = _httpClientFactory.CreateClient();
HttpResponseMessage? sourceResponse = await client.GetAsync(<yourUri>, HttpCompletionOption.ResponseHeadersRead, <yourCancellationToken>);
using (Stream stream = await sourceResponse.Content.ReadAsStreamAsync(default))
 {

    var putRequest = new PutObjectRequest
    {
        BucketName = bucketName,
        Key = key,
        InputStream = stream,
    };
    putRequest.Headers.ContentLength = sourceResponse.Content.Headers.ContentLength ?? -1;
    PutObjectResponse response = await amazonS3Client.PutObjectAsync(putRequest, default);
}

What I believe is happening is with the HTTP enum HttpCompletionOption.ResponseHeadersRead the Stream is of type ContentLengthReadStream which is not seekable and please correct me if i'm wrong, but looking through the SDK the following code presumes the request is a seekable stream:

namespace Amazon.Runtime.Internal
internal static void PrepareForRetry(IRequestContext requestContext)
{
    if (requestContext.Request.ContentStream != null &&
        requestContext.Request.OriginalStreamPosition >= 0)
    {
        var originalStream = requestContext.Request.ContentStream;
        var seekableStream = originalStream;

        // If the stream is wrapped in a CompressionWrapperStream, reset the CompressionWrapperStream
        var compressionWrapperStream = originalStream as CompressionWrapperStream;
        if (compressionWrapperStream != null)
        {
            compressionWrapperStream.Reset();
            seekableStream = compressionWrapperStream.GetSeekableBaseStream();
        }

        // If the stream is wrapped in a HashStream, reset the HashStream
        var hashStream = originalStream as HashStream;
        if (hashStream != null)
        {
            hashStream.Reset();
            seekableStream = hashStream.GetSeekableBaseStream();
        }
        seekableStream.Position = requestContext.Request.OriginalStreamPosition;
    }
}

Therefore when seekableStream.Position = requestContext.Request.OriginalStreamPosition; is executed, the method is attempting set the stream's position to the OriginalStreamPosition which is unknown resulting in an error.

However, only the original "503 SlowDown" exception is thrown and nothing to do with the above.

I tested all of this by creating a simple console application to upload to an AWS S3 Bucket and NOT simulated with devproxy, and as soon as an exception occurs, it immediately fails and does not retry. When the Stream is of type MemoryStream i.e seekable, everything works fine.

bhoradc commented 2 months ago

Hi @TomKirbz,

I am trying to reproduce this issue using below code sample in a console application.

static async Task Main(string[] args)
{
    Amazon.AWSConfigs.LoggingConfig.LogResponses = Amazon.ResponseLoggingOption.Always;
    Amazon.AWSConfigs.LoggingConfig.LogTo = Amazon.LoggingOptions.Console;
    Amazon.AWSConfigs.AddTraceListener("Amazon", new System.Diagnostics.ConsoleTraceListener());

    AmazonS3Config amazonS3Config = new();
    amazonS3Config.Timeout = TimeSpan.FromSeconds(10);
    amazonS3Config.RetryMode = RequestRetryMode.Standard;
    amazonS3Config.MaxErrorRetry = 10;
    amazonS3Config.ThrottleRetries = true;

    AmazonS3Client amazonS3Client = new(amazonS3Config);

    Func<Task<Uri>> source = async() => await GetDownloadFileUrl();

    var services = new ServiceCollection();
    services.AddHttpClient();
    var serviceProvider = services.BuildServiceProvider();
    var httpClientFactory = serviceProvider.GetService<IHttpClientFactory>();

    HttpClient client = httpClientFactory.CreateClient();
    HttpResponseMessage? sourceResponse = await client.GetAsync(await source(), HttpCompletionOption.ResponseHeadersRead);
    using (Stream stream = await sourceResponse.Content.ReadAsStreamAsync(default))
    {
        PutObjectRequest putRequest = new PutObjectRequest
        {
            BucketName = "<<bucket_name>>",
            Key = "<<object_name>>",
            InputStream = stream,
        };

        putRequest.Headers.ContentLength = sourceResponse.Content.Headers.ContentLength ?? -1;
        await amazonS3Client.PutObjectAsync(putRequest, default);
    }
}

static async Task<Uri> GetDownloadFileUrl()
{
    await Task.Delay(1000);
    string downloadUrl = "https://**/document/txt/example.txt";
    return new Uri(downloadUrl);
}

Getting below exception for the above stream, without simulating API returning 503 exception.

Unhandled exception. Amazon.S3.AmazonS3Exception: Could not determine content length
   at Amazon.S3.Internal.AmazonS3PostMarshallHandler.GetStreamWithLength(Stream baseStream, Int64 hintLength)
   at Amazon.S3.Internal.AmazonS3PostMarshallHandler.SetStreamChecksum(PutObjectRequest putObjectRequest, IRequest request)
   at Amazon.S3.Internal.AmazonS3PostMarshallHandler.SetStreamChecksum(AmazonWebServiceRequest originalRequest, IRequest request)
   at Amazon.S3.Internal.AmazonS3PostMarshallHandler.ProcessPreRequestHandlers(IExecutionContext executionContext)
   at Amazon.S3.Internal.AmazonS3PostMarshallHandler.PreInvoke(IExecutionContext executionContext)
   at Amazon.S3.Internal.AmazonS3PostMarshallHandler.InvokeAsync[T](IExecutionContext executionContext)

Can you please provide provide information on _httpClientFactory, GetDownloadFileUrl() and how you are simulating the 503 exception in your application?

Regards, Chaitanya

TomKirbz commented 2 months ago

Hi @bhoradc Thank you for your reply.

I don't think there is anything special about the code you've highlighted;

Together they return a HttpResponseMessage with a status code of 200 and a content length within sourceResponse.Content.Headers.ContentLength

I simulated the 503 exception hitting the S3 bucket with a number of Tasks in quick succession:

private AmazonS3Client _amazonS3Client;
private readonly IHttpClientFactory _httpClientFactory;

 public App(IHttpClientFactory httpClientFactory)
 {
     _httpClientFactory = httpClientFactory;
 }

internal async Task RunAsync()
{
    _amazonS3Client = GetAWSClient();
    await UploadToAwsMultiTask();
}

private async Task UploadToAwsMultiTask()
{
    int numberOfTasks = 5;
    Task[] tasks = new Task[numberOfTasks];

    for (int i = 0; i < numberOfTasks; i++)
    {
        tasks[i] = Task.Run(UploadToAws);
    }

    await Task.WhenAll(tasks);
}

 private async Task UploadToAws()
 {
     int? taskId = Task.CurrentId;
     HttpClient client = _httpClientFactory.CreateClient();

     Func<Task<Uri>> source = GetDownloadFileUrl(uniqueId);

     while (true)
     {
         try
         { 
             HttpResponseMessage? sourceResponse = await client.GetAsync(await source(), HttpCompletionOption.ResponseHeadersRead, default);
             using (Stream stream = await sourceResponse.Content.ReadAsStreamAsync(default))
             {

                 var putRequest = new PutObjectRequest
                 {
                     BucketName = "<bucketName>",
                     Key = "<keyName>",
                     InputStream = stream,
                 };
                 putRequest.Headers.ContentLength = sourceResponse.Content.Headers.ContentLength ?? -1;
                 PutObjectResponse response = await _amazonS3Client.PutObjectAsync(putRequest, default);

                 Console.WriteLine($"Upload Response: {response.HttpStatusCode}, from Task: {taskId}");
             }
         }
         catch (AmazonS3Exception ex)
         {
            Console.WriteLine($"Task {taskId} threw an exception: {ex.GetType()} {ex.ErrorCode} {ex.Message}");
             throw;
         }
     }
 }

private static AmazonS3Client GetAWSClient()
{
    RegionEndpoint region = RegionEndpoint.EUWest1;
    AmazonS3Config amazonS3Config = new()
    {
        Timeout = TimeSpan.FromHours(24),
        RetryMode = RequestRetryMode.Standard,
        MaxErrorRetry = 10,
        ThrottleRetries = true,
        RegionEndpoint = region
    };

    AmazonS3Client amazonS3Client = new(<credentials>, amazonS3Config );
    return amazonS3Client;
}

I hope this helps.