aws / aws-sdk-js-v3

Modularized AWS SDK for JavaScript.
Apache License 2.0
3.06k stars 573 forks source link

S3 Upload Through File #5975

Closed ronag closed 6 months ago

ronag commented 6 months ago

Describe the feature

The current multipart uploader uses a lot of memory due to in memory caching for hashing pre uploading. This could be achieved with much less memory if cached through file.

Use Case

Less memory usage.

Proposed Solution

import crypto from 'node:crypto'
import stream from 'node:stream'
import path from 'node:path'
import os from 'node:os'
import fs from 'node:fs'
import assert from 'node:assert'
import AWS from '@aws-sdk/client-s3'
import PQueue from 'p-queue'

const CONTENT_MD5_EXPR = /^[A-F0-9]{32}$/i
const CONTENT_LENGTH_EXPR = /^\d+$/i

const noop = (arg0) => {}

class PartUploader {
  #number
  #path
  #size
  #writable
  #callback
  #hasher
  #signal

  constructor(dir, number, signal) {
    this.#writable = null
    this.#callback = noop
    this.#hasher = crypto.createHash('md5')
    this.#size = 0
    this.#signal = signal
    this.#number = number
    this.#path = path.join(dir, `${this.#number}.part`)
  }

  get size() {
    return this.#size
  }

  async write(chunk) {
    this.#writable ??= fs
      .createWriteStream(this.#path, { signal: this.#signal })
      .on('drain', () => {
        this.#callback(null)
        this.#callback = noop
      })
      .on('error', (err) => {
        this.#callback(err)
        this.callback = noop
      })

    if (this.#writable.errored) {
      throw this.#writable.errored
    }

    this.#size += chunk.byteLength
    this.#hasher.update(chunk)

    if (!this.#writable.write(chunk)) {
      await new Promise((resolve, reject) => {
        this.#callback = (err) => (err ? reject(err) : resolve(null))
      })
      this.#signal.throwIfAborted()
    }
  }

  async end(s3, params) {
    try {
      if (!this.#writable) {
        throw new Error('No data to send')
      }

      if (this.#writable.errored) {
        throw this.#writable.errored
      }

      this.#writable.end()
      await stream.promises.finished(this.#writable)

      assert(this.#writable.bytesWritten === this.#size, 'Expected size to match bytesWritten')

      const { ETag } = await s3.send(
        new AWS.UploadPartCommand({
          ...params,
          ContentMD5: this.#hasher.digest('base64'),
          ContentLength: this.#size,
          PartNumber: this.#number,
          Body: fs.createReadStream(this.#path, { signal: this.#signal }),
        }),
      )

      return { part: { ETag, PartNumber: this.#number } }
    } catch (err) {
      return { error: err }
    } finally {
      await fs.promises.unlink(this.#writable.path)
    }
  }
}

export async function upload(
  {
    client: s3,
    signal: outerSignal,
    tmpdir = os.tmpdir(),
    partSize = 64e6,
    queueSize = 4,
    leavePartsOnError = false,
  },
  { Body, Key, Bucket, ContentMD5, ContentLength },
) {
  if (s3 == null) {
    throw new Error('Invalid client')
  }

  if (!Number.isFinite(partSize) || partSize <= 0) {
    throw new Error('Invalid partSize')
  }

  if (!Number.isFinite(queueSize) || queueSize <= 0) {
    throw new Error('Invalid queueSize')
  }

  if (ContentMD5 != null && !CONTENT_MD5_EXPR.test(ContentMD5)) {
    throw new Error(`Invalid ContentMD5: ${ContentMD5}`)
  }

  if (ContentLength != null && !CONTENT_LENGTH_EXPR.test(ContentLength)) {
    throw new Error(`Invalid ContentLength: ${ContentLength}`)
  }

  const dir = await fs.promises.mkdtemp(path.join(tmpdir, 's3-upload-'))
  outerSignal?.throwIfAborted()

  await fs.promises.stat(dir)
  outerSignal?.throwIfAborted()

  const queue = new PQueue({ concurrency: queueSize })
  const promises = []
  const ac = new AbortController()
  const signal = ac.signal

  const abort = () => ac.abort()
  outerSignal?.addEventListener('abort', abort)

  let uploadId
  try {
    const multipartUploadOutput = await s3.send(
      new AWS.CreateMultipartUploadCommand({
        Bucket,
        Key,
      }),
    )
    uploadId = multipartUploadOutput.UploadId
    signal.throwIfAborted()

    const uploader = {
      size: 0,
      hasher: crypto.createHash('md5'),
      part: new PartUploader(dir, 1, signal),
      number: 1,
    }

    const maybeFlush = (minSize) => {
      if (uploader.part.size && (minSize == null || uploader.part.size >= minSize)) {
        const part = uploader.part
        uploader.part = new PartUploader(dir, ++uploader.number, signal)

        const promise = queue.add(() => part.end(s3, { Bucket, Key, UploadId: uploadId }))
        promises.push(promise)
      }
    }

    for await (const chunk of Body) {
      signal.throwIfAborted()

      uploader.hasher.update(chunk)
      uploader.size += chunk.byteLength

      const thenable = uploader.part.write(chunk)
      if (thenable) {
        await thenable
        signal.throwIfAborted()
      }

      maybeFlush(partSize)
    }
    maybeFlush()

    const parts = []
    const errors = []
    for (const { part, error } of await Promise.all(promises)) {
      if (error) {
        errors.push(error)
      } else {
        parts.push(part)
      }
    }
    signal.throwIfAborted()

    if (errors.length > 0) {
      throw new AggregateError(errors, 'upload failed')
    }

    if (parts.length === 0) {
      throw new Error('upload empty')
    }

    const uploadOutput = await s3.send(
      new AWS.CompleteMultipartUploadCommand({
        Bucket,
        Key,
        UploadId: uploadId,
        MultipartUpload: { Parts: parts },
      }),
    )
    signal.throwIfAborted()

    const result = {
      size: uploader.size,
      hash: uploader.hasher.digest('hex'),
      output: uploadOutput,
      parts,
    }

    const size = ContentLength != null ? Number(ContentLength) : null
    const hash = ContentMD5

    if (size != null && size !== result.size) {
      throw new Error(`Expected size ${size} but got ${result.size}`)
    }

    if (hash != null && hash !== result.hash) {
      throw new Error(`Expected hash ${hash} but got ${result.hash}`)
    }

    return result
  } catch (err) {
    ac.abort(err)

    if (uploadId && !leavePartsOnError) {
      try {
        await s3.send(
          new AWS.AbortMultipartUploadCommand({
            Bucket,
            Key,
            UploadId: uploadId,
          }),
        )
      } catch (er) {
        throw new AggregateError([err, er])
      }
    }

    throw err
  } finally {
    outerSignal?.removeEventListener('abort', abort)
    await fs.promises.rmdir(dir, { recursive: true })
  }
}

Other Information

No response

Acknowledgements

SDK version used

3.540.0

Environment details (OS name and version, etc.)

OSX

github-actions[bot] commented 5 months ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs and link to relevant comments in this thread.