generateUniqueIdentifier - is not thenable/promise which is aweful #310

Open yanivkalfa opened 4 years ago

yanivkalfa commented 4 years ago

If i want to start an amazon multi-part upload i will have to first create the multi-part get the identifier return it to client then use that identifier to upload other chunks.

The way it stands right now, flow doesnt allow that..

AidasK commented 4 years ago

Will be possible to make after https://github.com/flowjs/flow.js/pull/304 is merged

command-tab commented 4 years ago

I'm currently successfully doing (unsigned) Amazon S3 multipart uploads with Flow.js 2.14.0 by creating the multipart upload in filesSubmitted and grabbing the UploadId from the response and storing it on the FlowFile:

const config = {
  responseType: 'document', // XML Document https://developer.mozilla.org/en-US/docs/Web/API/Document
  headers: {
    'Content-Type': flowFile.file.type // What the completed file's Content-Type header should be
const url = `${baseUrl}/${bucketName}/${flowFile.uniqueIdentifier}/${filename}?uploads`
const response = await this.$axios.post(url, null, config)
flowFile.uploadId = response.request.responseXML.getElementsByTagName('UploadId')[0].textContent

Then I return an appropriate URL for each part in target:

target: (flowFile, flowChunk, isTest) => {
  return `${baseUrl}/${bucketName}/${flowFile.uniqueIdentifier}/${filename}?partNumber=${flowChunk.getParams().flowChunkNumber}&uploadId=${flowFile.uploadId}`

And complete the upload by POSTing a manifest in fileSuccess:

const compareParts = function (a, b) {
  // Sort `parts` by chunkNumber, as parts may not be ordered in the array
  // if flow's `simultaneousUploads` is > 1 (parts may finish out of order)
  if (a.chunkNumber < b.chunkNumber) {
    return -1
  if (a.chunkNumber > b.chunkNumber) {
    return 1
  return 0

// Axios Complete Upload URL
const url = `${baseUrl}/${bucketName}/${flowFile.uniqueIdentifier}/${filename}?uploadId=${flowFile.uploadId}`

// Build out the "Complete Upload" ordered XML manifest
const xmlDocument = new Document()

// Prefer document.createElementNS over document.createElement because
// document.createElement (no 'NS') returns case-insensitive node tag names,
// and some XML parsers are strict about accepting the proper case
const ns = 'http://s3.amazonaws.com/doc/2006-03-01/'
const rootElement = document.createElementNS(ns, 'CompleteMultipartUpload')

// Append sorted Part elements
const sortedParts = flowFile.manifestParts.sort(compareParts)
sortedParts.forEach((part) => {
  const partNumberElement = document.createElementNS(ns, 'PartNumber')
  partNumberElement.textContent = part.chunkNumber

  const etagElement = document.createElementNS(ns, 'ETag')
  etagElement.textContent = part.etag

  const partElement = document.createElementNS(ns, 'Part')

const serializer = new XMLSerializer()
const payload = serializer.serializeToString(xmlDocument)

const config = {
  headers: {
    'Content-Type': 'text/octet-stream'
await this.$axios.post(url, payload, config)

I've used the above for both single and multiple simultaneous uploads.

You can also import the SparkMD5 library to compute hashes of each FlowFile chunk and set the Content-MD5 header in the preprocess and headers callbacks:

// Calculate MD5 of each chunk so we can set the S3 `Content-MD5` header on each chunk upload:
// https://github.com/flowjs/flow.js/issues/9#issuecomment-288750191
// S3 wants the Content-MD5 to be the base64-encoded 128-bit binary MD5 digest of the part data:
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html
preprocess (chunk) {
  if (chunk.readState === 0) {
    // readState=0: Read of file chunk not started. Triggering it now.
    chunk.preprocessState = 0
    const read = chunk.flowObj.opts.readFileFn
    read(chunk.fileObj, chunk.startByte, chunk.endByte, chunk.fileObj.file.type, chunk)
  } else if (chunk.readState === 1) {
    // Waiting... readState=1: Read of chunk is in progress.
    chunk.preprocessState = -1
  } else if (chunk.readState === 2) {
    // readState=2: Read is finished. We can now trigger MD5 compute.
    const reader = new FileReader()
    reader.onloadend = function () {
      // 'true' causes hash() to return a binary hash instead of hex hash
      const binaryHash = SparkMD5.ArrayBuffer.hash(reader.result, true)
      chunk.contentMD5 = btoa(binaryHash) // S3 expects base64(binaryhash)
headers (flowFile, flowChunk, isTest) {
  return { 'Content-MD5': flowChunk.contentMD5 }

Perhaps some of this will be a little simpler with async readFile?

yanivkalfa commented 4 years ago

I didnt say its not possible, i am doing it currently as well. but a bit differently, and i am going though the my server.

Right now the way i am doing it is on fileAdded: Grab new multi-upload id, then mutating the flowFile.uniqueIdentifier' with the new ID, and then i start the upload.

const { data: { UploadId } } = await uploadChunkedVideoStart({ fileName: file.name });
    flowFile.uniqueIdentifier = UploadId;

But there should be another way...

drzraf commented 4 years ago

https://github.com/flowjs/flow.js/pull/304 does not make it possible out of the box, but it (with https://github.com/flowjs/flow.js/pull/296) may be source of inspiration to support async generateUniqueIdentifier (or more generally, async tasks within addFiles) for someone willing to do a PR.

drzraf commented 3 years ago

@yanivkalfa : You could have a look at https://github.com/flowjs/flow.js/pull/329 and see if

await asyncAddFile(yourfile, null, async (flowFile, event) => {
 // whatever you want before initialization

would fit where the 3rd parameter is an async equivalent of initFileFn. Note: