bionode / bionode-watermill

💧Bionode-Watermill: A (Not Yet Streaming) Workflow Engine
https://bionode.gitbooks.io/bionode-watermill/content/
MIT License
37 stars 11 forks source link

Streaming Between Tasks #79

Open thejmazz opened 6 years ago

thejmazz commented 6 years ago

In order to be a streaming workflow engine, we need to support the ability to pipe between tasks. This does not add much to the "traditional" pipeline which mostly reads/writes files between tasks, but it can open interesting use cases:

Dump from docs:

If either (input or output) is not provided, it will be assumed the task is then a streaming task - i.e., it is a duplex stream with writable and/or readable portions. Consider:

const throughCapitalize = through(function (chunk, env, next) {
  // through = require('through2') - a helper to create through streams
  // takes chunk, its encoding, and a callback to notify when complete pushing
  // push a chunk to the readable portion of this through stream with
  this.push(chunk.toString().toUpperCase())
  // then call next so that next chunk can be handled
  next()

You could connect `capitalize` to a readable (`readFile`) and writable 
(`writeFile`) file 
stream with:

const capitalize = task({
  name: 'Capitalize Through Stream'
},
// Here, input is a readable stream that came from the previous task
// Let's return a through stream that takes the input and capitalizes it
({ input }) => input.pipe(throughCapitalize) )

const readFile = task({
  input: '*.lowercase',
  name: 'Read from *.lowercase'
}, ({ input }) => {
  const rs = fs.createReadStream(input)
  // Add file information to stream object so we have it later
  rs.inFile = input
})

const writeFile = task({
  output: '*.uppercase',
  name: 'Write to *.uppercase'
}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase')))

// Can now connect the three:
join(readFile, capitalize, writeFile)

Of course, this could be written as one single task. This is somewhat simpler, but the power of splitting up the read, transform, and write portions of a task will become apparent once we can provide multiple sets of parameters to the transform and observe the effect, without having to manually rewire input and output filenames. As a single task the above would become:

const capitalize = task({
  input: '*.lowercase',
  output: '*.uppercase',
  name: 'Capitalize *.lowercase -> *.uppercase'
}, ({ input }) =>
    fs.createReadStream(input)
    .pipe(throughCapitalize)
    .pipe(fs.createWriteStream(input.swapExt('lowercase')))
)
tiagofilipe12 commented 6 years ago

Tied to this issue, we should also be able to pass variables, functions, etc... between tasks and still be able to call them somehow in other downstream tasks.