Open sanny-io opened 1 month ago
Hi @sanny-io,
Some quick responses to the various elements of your request:
From what I know, this is mostly embedded into the SDK's that this library integrates with. For example, for S3 the constructor accepts an uploadConfiguration
option, allowing you to specify things like queueSize
and `partSize to influence concurrency.
For this I suspect we need to add the option of passing in an AbortController as an input option for various calls, passing it down to the implementations that support it. I'm not entirely sure what to do about implementations that don't, but I think we can figure out what a sane default it for that.
This one I 1) totally understand you're asking about and 2) have never found a reliable way to do this without end to end support from underlying implementations. What seems like a reliable path is to monitor stream consumption and (when available) upload progress. The reason I see this as two things, is that usually chunks are buffered and sent. Implementations like S3 will buffer a chunk and parallelise the upload. In those cases the stream consumption progress alone is not a meaningful indication of the progress of the entire call.
@frankdejonge
If we handle most of the process ourselves, we don't need to rely on the underlying implementations for anything other than the actual writing of the bytes and the multipart requests that accompany it. I have actually written implementations for all adapters except Azure (I've never used Azure). The logic does not change much between them, so I think I could get Azure done too.
I had tried monitoring stream consumption and yes I agree it is not reliable. At the bottom of this comment, I will leave the implementation for S3. It is not perfect. It's incomplete, and doesn't have any error handling, but hopefully you can see where my head is at with all this.
Something we should also keep in mind are the limitations for providers, like the minimum chunk sizes and maximum number of chunks.
export type WriteOptions = VisibilityOptions & MiscellaneousOptions & {
mimeType?: string,
size?: number,
cacheControl?: string,
};
export type WriteChunkedOptions = WriteOptions & {
size: number,
chunkSize?: number,
signal?: AbortSignal,
};
export type WriteChunkedProgress = {
bytesWritten: number,
fraction: number,
};
async* writeChunked(path: string, contents: Readable, options: WriteChunkedOptions): AsyncGenerator<WriteChunkedProgress> {
const key = this.prefixer.prefixFilePath(path);
const createUploadResponse = await this.client.send(new CreateMultipartUploadCommand({
Bucket: this.options.bucket,
Key: key,
ContentType: options.mimeType,
}));
let totalBytesWritten = 0;
let partNumber = 1;
const parts = [];
for await (const [chunk] of on(contents, 'data') as AsyncGenerator<Buffer[]>) {
if (options.signal?.aborted) {
throw new Error('Aborted');
}
const uploadPartResponse = await this.client.send(new UploadPartCommand({
Bucket: this.options.bucket,
Key: key,
PartNumber: partNumber,
UploadId: createUploadResponse.UploadId,
Body: chunk,
}));
parts.push({
PartNumber: partNumber,
ETag: uploadPartResponse.ETag,
});
partNumber++;
totalBytesWritten += chunk.byteLength;
const fraction = totalBytesWritten / options.size;
yield {
bytesWritten: chunk.byteLength,
fraction,
}
if (fraction === 1) {
break;
}
}
await this.client.send(new CompleteMultipartUploadCommand({
Bucket: this.options.bucket,
Key: key,
UploadId: createUploadResponse.UploadId,
MultipartUpload: {
Parts: parts,
},
}));
}
for await (const progress of storage.writeChunked(path, readable, {
size: file.size,
signal: request.signal,
chunkSize,
})) {
console.log(progress.fraction);
}
Thanks for the example code. Looking over it, I notice that, instead of uploading concurrently, now the parts are uploaded in sequence over multiple requests. Doing so will impact the performance of the operation substantially. In this case, I'd value the performance more than the ability to monitor progress.
Perhaps it's best to focus on the other elements of your initial message and let this one slide.
As I had mentioned, this code is incomplete. Do you have any concerns that would make it impossible to support concurrency and progress monitoring? I think I am not understanding why this is an either-or situation.
The concerns would be the burden of maintenance, primarily. Since concurrency is already provided by the underlying implementation, re-implementing that exposes me to having to maintain that bit when the underlying implementations change. Having a higher-level implementation point that encapsulates all of that logic.
Looking into the Upload
class from the AWS SDK shows there is a httpUploadProgress
event that we can hook into, preventing us from implementing a large chunk of this logic ourselves. Perhaps we can find equivalents for Azure and GCS, if so, we can look at what a general payload would/could/should look like.
To get back to my initial response. Your initial request covers 3 areas of concern, which would be best split up into separate issues so we can have a topical discussion on each of them.
I am proposing some sort of functionality for specifying a write chunk size, a way to monitor write progress, and aborting the write altogether.
I am willing to work on implementing this myself but would like to gather feedback in regards to how the API should look. I'm thinking a new function,
writeChunked
, would be ideal to avoid introducing too much complexity towrite
.Relevant links:
https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html https://learn.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs#about-block-blobs