bionode / bionode-watermill

đŸ’§Bionode-Watermill: A (Not Yet Streaming) Workflow Engine
https://bionode.gitbooks.io/bionode-watermill/content/
MIT License
37 stars 11 forks source link

Catch Operation Finish #41

Closed thejmazz closed 7 years ago

thejmazz commented 7 years ago

Consider this task:

const switchFileType = (str, type) => {
  // probably nicer with a regex replace..
  const pieces = str.split('.')
  pieces.pop()
  return pieces.join('.') + '.' + type
}

const throughUppercase = through(function (chunk, enc, cb) {
  cb(null, chunk.toString().toUpperCase())
})

const uppercaser = task({
  input: '*.lowercase',
  output: '*.uppercase',
  name: 'Uppercase Transform'
}, ({ input }) => 
  fs.createReadStream(input)
    .pipe(throughUppercase)
    .pipe(fs.createWriteStream(switchFileType(input, 'uppercase')))
)

The operation for this task will return a writable stream. Then dup.setWritable(operation) will apply it to the task duplexify stream. Similarly for readable.

Then in the catchTask action, the purpose is to resolve once the operation has completed. It's called "catch" because it stops the task lifecycle action pipeline until the operation is completed, so that then resolve output and validate output can be ran.

After output is successfully validated (file is not empty, etc.), I set an _output object in the duplex stream, and destroy the stream:

function finish(uid) {
  stream._output = getTask(uid).resolvedOutput
  stream.destroy()
  // TODO not this
  // 'destroy' may as well be 'breaksCompatibleWithStreamsModules'
  stream.emit('destroy')
}

On destroy() the duplex stream emits a close. Then the lifecycle of a task is something like:

uppercase()
  .on('destroy', function() {
     console.log(this._output) // object with absolute path of output
  }

and join will use that to run tasks after each other, collect all the outputs into the "output dump", etc.

However, this breaks if the operator itself throws a close event, as what happens with child processes. My solution to this was (which only switched to the emit('destroy') hack recently as I was hacking features into refactored code) was to add a function to the stream object, which could access things through a closure, and would run the output resolution and validations inside that function. On any "ending type" event - end, close, finish, you could call it. So the lifecycle for a task was:

uppercase()
  .on('close', function() {
     const output = this._output() // runs output resolution and validation now, but "outside" the task
     console.log(output)
  }

Which is better because it uses no made up events, but a little annoying because if you just do

myTask()

if will not actually run the whole task lifecycle until this._output() is called. Perhaps a taskWrapper could auto call this, and then itself emit a close.

Ideally:

someTask() // whole lifecycle contained inside here
  .on('close', // or some otherwise standard event, end, finish, etc
    function(this) {
      console.log(this._output) // prepared resolved+validated output
   })

This way task lifecycle is contained in one place without doing weird wrappers that would themselves need to emit close, and the resolved output is available immediately when a task is "done".

I saw mention of a flush function, but perhaps only for a writable stream?

Also, operations which are actually just duplex streams, like a transform, can be set as the duplex inside task and forking/parallel should work as if task is a regular duplex; so the "catch finish" need not apply since no output files need to resolved to absolute paths.

thejmazz commented 7 years ago

Summary and notes from chat about this:

instead

try to avoid "foot shooting" - example, if a streaming task is joined to a non-writable:

join(streamingTask, somethingThatIsNotWritable)
thejmazz commented 7 years ago

done with async-done