googleapis / google-cloud-dotnet

Google Cloud Client Libraries for .NET
https://cloud.google.com/dotnet/docs/reference/
Apache License 2.0
917 stars 362 forks source link

Multipart Upload with GCS .NET SDK #13021

Open joaopaulopmedeiros opened 1 month ago

joaopaulopmedeiros commented 1 month ago

Hey guys, I recently developed a proof of concept about on-demand csv files generation. The goal is to retrieve data from a relational database, map to csv and then upload it to bucket on cloud given a chunksize (eg 5MB). I've read the docs and tried to use Resumable Upload feature, but my file gets overwritten.

A "complete" method would be very useful...but i didn't find anything about it. Could you help out here?

Successful sample code with AWS provider:

using System.Buffers;
using System.Globalization;
using System.IO;

using Amazon.Runtime;
using Amazon.S3;
using Amazon.S3.Model;

using CsvHelper;

using Report.Generator.Infra.Repositories;

namespace Report.Generator.Infra.Generators
{
    public class CsvAWSReportGenerator
    {
        private readonly IAmazonS3 _s3Client;

        public CsvAWSReportGenerator(IAmazonS3 s3Client)
        {
            _s3Client = s3Client;
        }

        public async Task GenerateAsync(string bucketName, string keyName, int blockSize)
        {
            byte[] buffer = ArrayPool<byte>.Shared.Rent(blockSize);
            int bufferPosition = 0;

            List<UploadPartResponse> uploadResponses = new();
            InitiateMultipartUploadRequest initiateRequest = new()
            {
                BucketName = bucketName,
                Key = keyName
            };

            InitiateMultipartUploadResponse initResponse =
                await _s3Client.InitiateMultipartUploadAsync(initiateRequest);

            try
            {
                using var memoryStream = new MemoryStream();
                using var writer = new StreamWriter(memoryStream);
                using var csvWriter = new CsvWriter(writer, CultureInfo.InvariantCulture);

                int partNumber = 1;

                await foreach (var product in ProductRepository.FetchProductsAsync())
                {
                    memoryStream.SetLength(0);
                    csvWriter.WriteRecord(product);
                    await csvWriter.NextRecordAsync();
                    await writer.FlushAsync();
                    memoryStream.Position = 0;

                    while (memoryStream.Position < memoryStream.Length)
                    {
                        int bytesToRead = Math.Min(blockSize - bufferPosition, (int)(memoryStream.Length - memoryStream.Position));
                        int bytesRead = await memoryStream.ReadAsync(buffer, bufferPosition, bytesToRead);
                        bufferPosition += bytesRead;

                        if (bufferPosition == blockSize)
                        {
                            await UploadPartAsync(buffer, bufferPosition, bucketName, keyName, initResponse.UploadId, partNumber++, uploadResponses);
                            bufferPosition = 0;
                        }
                    }
                }

                if (bufferPosition > 0)
                {
                    await UploadPartAsync(buffer, bufferPosition, bucketName, keyName, initResponse.UploadId, partNumber, uploadResponses);
                }

                ArrayPool<byte>.Shared.Return(buffer);

                CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest
                {
                    BucketName = bucketName,
                    Key = keyName,
                    UploadId = initResponse.UploadId
                };

                completeRequest.AddPartETags(uploadResponses);

                CompleteMultipartUploadResponse completeUploadResponse =
                    await _s3Client.CompleteMultipartUploadAsync(completeRequest);
            }
            catch (Exception exception)
            {
                Console.WriteLine("An AmazonS3Exception was thrown: {0}", exception.Message);

                AbortMultipartUploadRequest abortMPURequest = new AbortMultipartUploadRequest
                {
                    BucketName = bucketName,
                    Key = keyName,
                    UploadId = initResponse.UploadId
                };
                await _s3Client.AbortMultipartUploadAsync(abortMPURequest);
            }
        }

        private async Task UploadPartAsync(byte[] buffer, int bufferLength, string bucketName, string keyName, string uploadId, int partNumber, List<UploadPartResponse> uploadResponses)
        {
            using var partStream = new MemoryStream(buffer, 0, bufferLength);
            UploadPartRequest uploadRequest = new UploadPartRequest
            {
                BucketName = bucketName,
                Key = keyName,
                UploadId = uploadId,
                PartNumber = partNumber,
                PartSize = bufferLength,
                InputStream = partStream
            };

            uploadRequest.StreamTransferProgress += new EventHandler<StreamTransferProgressArgs>(UploadPartProgressEventCallback);
            uploadResponses.Add(await _s3Client.UploadPartAsync(uploadRequest));
        }

        public void UploadPartProgressEventCallback(object sender, StreamTransferProgressArgs e)
        {
            Console.WriteLine("{0}/{1}", e.TransferredBytes, e.TotalBytes);
        }
    }
}

PS: I've seen GCS does have a XML multipart upload API.

jskeet commented 1 month ago

Showing the S3 code doesn't really help us know what your GCS code looks like. Please provide a minimal example using Google.Cloud.Storage.V1.

joaopaulopmedeiros commented 1 month ago

@jskeet Example with UploadObjectAsync method. I suppose my problem is that I can't specify my part number.

using System.Buffers;

using System.Globalization;

using CsvHelper;

using Google.Cloud.Storage.V1;

using Report.Generator.Infra.Repositories;

namespace Report.Generator.Infra.Generators
{
    public class CsvGCSReportGenerator
    {
        private readonly StorageClient _storageClient;

        public CsvGCSReportGenerator(StorageClient storageClient)
        {
            _storageClient = storageClient;
        }

        public async Task GenerateAsync(string bucketName, string fileName, int blockSize)
        {
            byte[] buffer = ArrayPool<byte>.Shared.Rent(blockSize);
            int bufferPosition = 0;

            using var memoryStream = new MemoryStream();
            using var writer = new StreamWriter(memoryStream);
            using var csvWriter = new CsvWriter(writer, CultureInfo.InvariantCulture);

            int partNumber = 1;

            await foreach (var product in ProductRepository.FetchProductsAsync())
            {
                memoryStream.SetLength(0);
                csvWriter.WriteRecord(product);
                await csvWriter.NextRecordAsync();
                await writer.FlushAsync();
                memoryStream.Position = 0;

                while (memoryStream.Position < memoryStream.Length)
                {
                    int bytesToRead = Math.Min(blockSize - bufferPosition, (int)(memoryStream.Length - memoryStream.Position));
                    int bytesRead = await memoryStream.ReadAsync(buffer, bufferPosition, bytesToRead);
                    bufferPosition += bytesRead;

                    if (bufferPosition == blockSize)
                    {
                        await UploadPartAsync(buffer, bufferPosition, bucketName, fileName, partNumber++);
                        bufferPosition = 0;
                    }
                }
            }

            if (bufferPosition > 0)
            {
                await UploadPartAsync(buffer, bufferPosition, bucketName, fileName, partNumber);
            }

            ArrayPool<byte>.Shared.Return(buffer);
        }

        private async Task UploadPartAsync(byte[] buffer, int bufferLength, string bucketName, string fileName, int partNumber)
        {
            using var partStream = new MemoryStream(buffer, 0, bufferLength);
            await _storageClient.UploadObjectAsync(bucketName, $"{fileName}.csv", "text/csv", partStream);
            Console.WriteLine($"Uploaded part {partNumber}");
        }
    }
}

Previous attempt with Resumable Upload: PS: Only worked puting all items into memory which I'm not supposed to do.

using System.Globalization;
using System.IO;
using System.Net.Mime;

using CsvHelper;

using Google.Apis.Upload;
using Google.Cloud.Storage.V1;

using Report.Generator.Domain.Entities;
using Report.Generator.Infra.Repositories;

namespace Report.Generator;

public class Program
{
    public async static Task Main()
    {
        Console.WriteLine($"Started at {DateTime.Now}");

        using var memoryStream = new MemoryStream();
        using var writer = new StreamWriter(memoryStream);
        using var csvWriter = new CsvWriter(writer, CultureInfo.InvariantCulture);

        csvWriter.WriteHeader<Product>();
        await csvWriter.NextRecordAsync();

        var client = await StorageClient.CreateAsync();

        var options = new UploadObjectOptions
        {
            ChunkSize = UploadObjectOptions.MinimumChunkSize
        };

        var uploadUri = await client.InitiateUploadSessionAsync(Environment.GetEnvironmentVariable("BUCKET_NAME"), "report.csv", "text/csv", contentLength: null, options);

        int batchSize = 100_000;

        await foreach (var product in ProductRepository.FetchUnbufferedProductsAsync(batchSize))
        {
            csvWriter.WriteRecord(product);
            csvWriter.NextRecord();
            Console.WriteLine(product.Title);
        }

        await writer.FlushAsync();
        memoryStream.Position = 0;

        IProgress<IUploadProgress> progress = new Progress<IUploadProgress>(
          p => Console.WriteLine($"bytes: {p.BytesSent}, status: {p.Status}")
        );

        var actualUploader = ResumableUpload.CreateFromUploadUri(uploadUri, memoryStream);

        actualUploader.ChunkSize = UploadObjectOptions.MinimumChunkSize * 2;

        actualUploader.ProgressChanged += progress.Report;

        await actualUploader.UploadAsync();

        Console.WriteLine($"Finished at {DateTime.Now}");
    }
}
jskeet commented 1 month ago

No, you can't - at least not with our libraries. I'm afraid it's a use-case we just don't support at the moment. Assuming I've understood you correctly, this is basically equivalent to this issue.

I think it's unlikely that we'll support this any time soon. What you could do is upload each part to a separate object, and then use the Compose operation (from StorageService; it's not exposed directly in StorageClient). to create a single object after the fact.

I'll reassign this to a member of the Storage team in case you have any further questions.

joaopaulopmedeiros commented 1 month ago

Alright. Thanks a lot for feedback

joaopaulopmedeiros commented 1 month ago

@JesseLovelace, could you help with this feature? Maybe work together to address a solution.