bionode / bionode-watermill

💧Bionode-Watermill: A (Not Yet Streaming) Workflow Engine
https://bionode.gitbooks.io/bionode-watermill/content/
MIT License
37 stars 11 forks source link

Passing output from last task into new task #5

Closed thejmazz closed 8 years ago

thejmazz commented 8 years ago

For duplex streams, this is obvious.

But for tasks that create files:

const task = Task({
   input: 'foo'
   output: new File('*.txt')
}, ({ input, output }) => shell(`echo ${input} > log.txt`))

We wait for the task to finish: inside Task

stream.on('end' /* or finish */, () => {
   // resolve output glob pattern and emit it
   const resolvedOutput = globby.sync(output.value)
   stream.emit('task.done', resolvedOutput)
})

Then it is Join's job to pass these on:

function join(...tasks) {
  const doTaskAndTryNext = (i, tasks) => { 
    const task = tasks[i]

    task()
      .on('task.done', (output) => {
        console.log('Task finished with output:')
        console.log(tab(1) + output)

        if (i+1 !== tasks.length) {
          doTaskAndTryNext(i+1, tasks)
        }
      })  
  }

  doTaskAndTryNext(0, tasks)
}

But nothing is being passed in yet.

Idea 1

Make Task always be a duplex object stream. It's first and final chunks will be objects pertaining to input and output. Then we need to wrap the actual stream of the task, for example, a Readable whose this._source is just another stream. This may complicated doing duplex streams piping together.

Idea 2

Wow. This feels so obvious now. Pass items in the first param of Task. (which right now is just () => { ... }. I'll say my third idea anyways lol.

Idea 3

Have a task.made event emitted that emits a callback which can then pass in resolved outputs which become resolved inputs in the new task.

It is important to handle this case well, because with these pipelines, not everything is streamable. For example, doing an alignment requires an indexing on the reference which needs to run fully. But I think having Task always be a stream (i.e. it wraps promises into streams) makes it easier to then orchestrate things. And having an output as a File tells the orchestrator that this Task cannot be piped into another - unless we go the route where Task is always a duplex stream that wraps over other streams.

thejmazz commented 8 years ago

@mafintosh @maxogden your thoughts? making Task a duplex alway that wraps an inner stream, and then having its first and last chunks be specific objects, be too much of a complication? It would mess up a.pipe(b).pipe(c) since the custom objects would be stuck in, not sure atm how to get around that, somehow only pipe through the inner stream but also be able to check for task.begin and task.done events on the outer stream returned by Task

Also, forcing it to be a stream would help with async - #4. Streams > promises.

Or just pass in params lol..

thejmazz commented 8 years ago
thejmazz commented 8 years ago

did not actually do this, was resolving from fs

thejmazz commented 8 years ago

@mafintosh I attached a function output to the stream object inside task, then when join sees a .on('end' /* or finish */'), it will call this.output() to get the actual values.

I tried attaching a bare object first (see closing commit before reopen), from inside task, after end/finish, but then the end/finish handler from join doesnt get it

not sure if theres another way to do it when you can just attach an object rather than a closured function