aws / aws-sdk-net

The official AWS SDK for .NET. For more information on the AWS SDK for .NET, see our web site:
http://aws.amazon.com/sdkfornet/
Apache License 2.0
2.06k stars 854 forks source link

Amazon RDSDataService returns SocketException #1682

Closed drewsb closed 4 years ago

drewsb commented 4 years ago

Description

I am currently creating a lambda function which calls the IAmazonRDSDataService method BatchExecuteStatementAsync(BatchExecuteStatementRequest, CancellationToken) to batch write multiple insert requests into my Aurora Serverless DB Cluster. The request executes for approximately 30 seconds then fails with an error message.

I am able to successfully call ExecuteStatementAsync, so I know I am able to connect to my database. If I just call await _dataService.BatchExecuteStatementAsync(request).ConfigureAwait(false); without returning a response back to my lambda function, the lambda function completes without throwing an exception but never inserts any items into the database. Whenever something is returned by IAmazonRDSDataService , my guess is this forces the lambda to block the thread until a response has created, however I always receive this exception: Unable to read data from the transport connection: Broken pipe.

Reproduction Steps

Code:

public async Task<HttpStatusCode> BatchInsertItemsAsync(IEnumerable<T> items)
        {
            List<T> itemList = items.ToList();
            Console.WriteLine($"Batch inserting items with total count: {itemList.Count()}.");
            List<List<SqlParameter>> parameterSets = ConstructSqlParameterSets(itemList);
            BatchExecuteStatementRequest request = new BatchExecuteStatementRequest
            {
                Database = _database,
                ResourceArn = _resourceArn,
                ParameterSets = parameterSets,
                SecretArn = _secretArn,
                Schema = _dbSchema,
                Sql = ConstructInsertSqlStatement()
            };
            try
            {
                var response = await _dataService.BatchExecuteStatementAsync(request).ConfigureAwait(false);
                Console.WriteLine($"Response: {JsonConvert.SerializeObject(response)}");
                return response.HttpStatusCode;
            }
            catch (Exception e)
            {
                Console.WriteLine(JsonConvert.SerializeObject(e));
                while (e.InnerException != null)
                {
                    e = e.InnerException; // if the inner exception exists, use that message - otherwise use the exception message
                }
                Console.WriteLine(JsonConvert.SerializeObject(e));
                throw e;
            }
        }

        public static string ConstructInsertSqlStatement()
        {
            string insertQuery = $"insert into {_dbSchema} values (";
            foreach (PropertyInfo p in typeof(T).GetProperties())
            {
                insertQuery += ":" + p.Name + ",";
            }
            insertQuery = insertQuery.TrimEnd(',');
            insertQuery += ")";
            Console.WriteLine($"Insert statement: {insertQuery}");
            return insertQuery;
        }

        public List<List<SqlParameter>> ConstructSqlParameterSets(IList<T> items)
        {
            Console.WriteLine($"Constructing parameter set. Item count: {items.Count()}");
            List<List<SqlParameter>> parameterSets = new List<List<SqlParameter>>();
            foreach (T item in items)
            {
                List<SqlParameter> parameterSet = new List<SqlParameter>();
                foreach (PropertyInfo p in typeof(T).GetProperties())
                {
                    Field f = new Field();
                    if (p.GetValue(item) == null)
                    {
                        f.IsNull = true;
                        parameterSet.Add(new SqlParameter
                        {
                            Name = p.Name,
                            Value = f
                        });
                        continue;
                    }
                    else if (p.PropertyType == typeof(long) || p.PropertyType == typeof(long?))
                    {
                        f.LongValue = Convert.ToInt64(p.GetValue(item));
                    }
                    else if (p.PropertyType == typeof(int) || p.PropertyType == typeof(int?))
                    {
                        f.LongValue = Convert.ToInt64((int)p.GetValue(item));
                    }
                    else if (p.PropertyType == typeof(double) || p.PropertyType == typeof(double?))
                    {
                        f.DoubleValue = Convert.ToDouble((p.GetValue(item)));
                    }
                    else if (p.PropertyType == typeof(decimal) || p.PropertyType == typeof(decimal?))
                    {
                        f.DoubleValue = Decimal.ToDouble((decimal)p.GetValue(item));
                    }
                    else if (p.PropertyType == typeof(string))
                    {
                        f.StringValue = (string)p.GetValue(item);
                    }
                    else if (p.PropertyType == typeof(bool) || p.PropertyType == typeof(bool?))
                    {
                        f.BooleanValue = (bool)p.GetValue(item);
                    }
                    SqlParameter param = new SqlParameter
                    {
                        Name = p.Name,
                        Value = f
                    };
                    parameterSet.Add(param);
                }
                parameterSets.Add(parameterSet);
            }
            Console.WriteLine($"Parameter set count: {parameterSets.Count()}.");
            return parameterSets;
        }
{
----------------------------------------
namespace BulkInsertCsvDataTask
{
    public class Function
    {
       try
            {
                GetObjectRequest request = new GetObjectRequest
                {
                    BucketName = fileUploadEvent.BucketName,
                    Key = fileUploadEvent.Key
                };
                using GetObjectResponse response = await _s3Client.GetObjectAsync(request).ConfigureAwait(false);
                using Stream responseStream = response.ResponseStream;
                using (var reader = new StreamReader(responseStream))
                using (var csv = new CsvReader(reader, CultureInfo.InvariantCulture))
                {
                    csv.Configuration.PrepareHeaderForMatch = (string header, int index) => header.ToLower();
                    csv.Configuration.HeaderValidated = null;
                    csv.Configuration.MissingFieldFound = null;
                    records = csv.GetRecords<ValuationsUploadRow>();
                    batchWriteStatusCode = await _dataRepository.BatchInsertItemsAsync(records).ConfigureAwait(false);
                }
            }
            catch (Exception e)
            {
                while (e.InnerException != null)
                {
                    e = e.InnerException; // if the inner exception exists, use that message - otherwise use the exception message
                }
                LambdaLogger.Log($"File parse/upload process failed with exception: {e.Message}.");
                throw e;
            }
     }
}

Logs

Stack Trace:

{"ClassName":"System.IO.IOException","Message":"Unable to read data from the transport connection: Broken pipe.","Data":null,"InnerException":{"ClassName":"System.Net.Sockets.SocketException","Message":"Broken pipe","Data":null,"InnerException":null,"HelpURL":null,"StackTraceString":null,"RemoteStackTraceString":null,"RemoteStackIndex":0,"ExceptionMethod":null,"HResult":-2147467259,"Source":null,"WatsonBuckets":null,"NativeErrorCode":32},"HelpURL":null,"StackTraceString":"   at Amazon.Runtime.HttpWebRequestMessage.GetResponseAsync(CancellationToken cancellationToken)\n   at Amazon.Runtime.Internal.HttpHandler`1.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.Unmarshaller.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.ErrorHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.CredentialsRetriever.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.ErrorCallbackHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Amazon.Runtime.Internal.MetricsHandler.InvokeAsync[T](IExecutionContext executionContext)\n   at Bx.ServerlessWorkflow.DataRepository.RdsDataRepository`1.BatchInsertItemsAsync(IEnumerable`1 items) in C:\\git\\serverless\\valuations-file-upload-demo\\src\\Bx.ServerlessWorkflow.DataRepository\\RdsDataRepository.cs:line 51","RemoteStackTraceString":null,"RemoteStackIndex":0,"ExceptionMethod":null,"HResult":-2146232800,"Source":"AWSSDK.Core","WatsonBuckets":null}
{"ClassName":"System.Net.Sockets.SocketException","Message":"Broken pipe","Data":null,"InnerException":null,"HelpURL":null,"StackTraceString":null,"RemoteStackTraceString":null,"RemoteStackIndex":0,"ExceptionMethod":null,"HResult":-2147467259,"Source":null,"WatsonBuckets":null,"NativeErrorCode":32}
File parse/upload process failed with exception: Broken pipe.Broken pipe: SocketException

Environment

Resolution


This is a :bug: bug-report

ashishdhingra commented 4 years ago

Hi @drewsb,

Thanks for posting the issue.

Few things to check out for:

Thanks, Ashish

drewsb commented 4 years ago

Hi @ashishdhingra, I realized the issue is that I'm either exceeding the maximum number of requests in the BatchWriteRequest 1000 per second), or I'm returning too much data (1MB of response data). I am trying to insert over 4000 items into my Aurora Serverless DB cluster, and it produces a Socket Exception each time. I'm not sure why it results in a SocketException and not a more descriptive error, the Aurora Data API should handle these requests better or at least mention these limits in the AWS SDK documents. I resolved this by partitioning the requests into groups of 500, and executing batch writes for each group in parallel. This workaround allows me to avoid the max number of requests per second limit as well as the response data limit. Leaving this here in case anyone else encounters this issue.

ashishdhingra commented 4 years ago

Hi @drewsb,

Thanks for the update. I agree that the Aurora Data API should handle these errors in a more descriptive way. I will try to have a look at the SDK code.

Please advise if you would like to keep this issue open or if we could close it.

Thanks, Ashish

drewsb commented 4 years ago

This is resolved. Appreciate the help @ashishdhingra .