bionode / bionode-watermill

💧Bionode-Watermill: A (Not Yet Streaming) Workflow Engine
https://bionode.gitbooks.io/bionode-watermill/content/
MIT License
37 stars 11 forks source link

Task Architecture #1

Closed thejmazz closed 8 years ago

thejmazz commented 8 years ago

A Task is the basic building block of pipelines in Waterwheel. It needs to be able to handle streams, synchronous, and asynchronous functions. There needs to be an standard way to declare a Task is over. As well, Tasks need to be joined into new Tasks, and parallelized into new Tasks. You should be able to join two Tasks that stream one into another without additional config.

Tasks can be:

A File is just a class that hold a value and is used (at the moment) in the task action creator for an input instanceof File check to then determine if the file pattern (e.g. **/*.sra) should be resolved with globby. This is so that the task stream/promise creator can use a resolved input variable with the actual filename.

We will refer to the joining:

as the orchestration of Tasks.

One way to enable the orchestration of Tasks is with callbacks. The task creator takes an object defining input and output, and then a function describing the task action creator. This returns a function of next. The join method and parallel method programmatically assign a function to the next to achieve their goals.

Another way is with Promises. Perhaps more elegant than callbacks, can reject when things go bad.

Another way to do this can be through events. The Task function can thrown an event whence the return Object from the task creator has completed. This has the advantage that it helps define a standard way to declare tasks are over. However, perhaps it can become messy listening for the same event when doing joins and parallels. This can be superseded by emitting a taskFinish event with some data that perhaps has a task uuid.

thejmazz commented 8 years ago

@maxogden @mafintosh your thoughts on using callbacks, promises (would be nice cause can reject when a task fails), or an event emitter as the backbone for the Task object? Current it is with a callback that is given on a curry function.

mafintosh commented 8 years ago

could you add a couple of code examples show casing how you would use this api in a simple application?

thejmazz commented 8 years ago

Using this dataset with 1809 samples:

It is an expression profiling dataset - so transcriptomic - i.e. "what genes are active, when?" - which is done by sampling active RNAs and counting them. As per the central dogma, DNA -> RNA -> protein, so by investigate RNAs in a cell over time we can hypothesize some things.

// Get a stream of data which has description, run SRA ID, other metadata

// task(props, actionCreator)(next)
// curries a next callback for now, need to investigate promises/events/streams as alternative
const sraStream = Task(
{
  input: {
    db: 'sra',
    term: 'SRP061902'
  },
  // declares this task is a readable stream
  output: stdout()
},
 // then pass the "task action creator", which uses props
({ input }) => ncbi.search(input.db, input.term)
)

// some notes:
// allow task action creator to return only a stream, or also Promise/callback, event emitter (stream)
// I think generic JS promises should be allowed, one might want to run some http requests on data, etc
// forcing everything to be a stream may be too much overhead for consumers
// but perhaps the task objects themselves can still return a stream/even emitter wrapping these callbacks/promises. so might only emit a single 'data' and 'end', or a bunch if its an actual stream

// Example of forking
// somewhat forced, but lets say we want to fork the stream of samples data to
// 1. an analysis pipeline (the heavy stuff)
// 2. a metadata analysis - key words in experiment summaries, common terms, gene names, etc

// So, we have a forking in the pipeline
// but we don't want to force the user to have extra mental overhead for handling these
// the idea is that if everything is a Task, waterwheel can automatically orchestrate them

const metadataAnalysis = Task(
{
  input: stdin() // declares this task can be joined from another task that ouputs stdout()
  output: stdout() // declares this task is a duplex stream. perhaps emits values to be written to disk
},
() => myMetadataDuplexStream()
)

const getSamples = Task({
  input: stdin()
  output: file('**/*.sra') // the next task needs the sample SRA to be fully written to disk, so can't duplex here
  // this illustrates we will need to mix: readable, writable, duplex, promises, cbs
}, ({ input }) => {
   // give this action creator each data event or the whole stream?
   // also need to handle how each data event returns a stream
   // tasks returns a stream of streams? or because file() it is ok to not handle these
   input.on('data', ({ id }) => ncbi.download('sra', id)
   // this essentially downloads a bunch of files
   // the output pattern is globbed and can be given to next task
}
)

// this will be off, I have not done RNAseq pipelines yet, but still illustrates API
const alignRNAs = Task({
  input: file('**/*.sra'),
  output: file('someAlignmentFiles')
}, ({ input }) => shell(`some-tool ${input}`)
)

const doStuffWithAlignedRNAs = Task({ ... }) 

const indexReference = Task({ ... }) // alignment needs an indexed reference

// Example of  parallel, and joins

// need to think out this API call way more
const pipeline = (
  // everything in the array gets a fork of this
  fork(getSamples, [
     metadataAnalysis(),
     parallel(
       indexReference,
      // how to make sure alignRNAs goes after indexReference?
     // this syntax is not describing that
       join(getSamples, alignRNAs, ..., doStuffWithAlignedRNAs)
     )
)

// So at a high level:
// Task: stream, promise, callback (perhaps kill callback in favour of promise)
// Tasks can be orchestrated no matter what they are
// 1. linear joins that pass info between
// 2. forks
// 3. parallel
// 4. what else?

Something along those lines. Let me know of your confusions/questions. I'd like to get a stable version of these features in the next few days, using toy streams (does not need to bioinformatics to show task orchestration).

thejmazz commented 8 years ago

After thinking about it a bit more, I'm leaning towards Task returning a stream/event emitter sort of object. Callbacks fn(done) and Promises could be wrapped and the Task can emit a taskDone event after done() callback called or promise resolved, respectively. If the action creator returns a stream, the task can then be that stream. Or perhaps instead of taskDone use duplex stream events: end and finish.

Essentially: Task returns a fully malleable (readable, writable, duplex)stream. It wraps Promises/callbacks into tiny streams that emit end once.

Task(props, actionCreator) -> stream/E.E

writable stream events: close, drain, error, finish, pipe, unpipe readable: close, data, end, error

Should Task emit its own events? Or use readable/writable/duplex stream events?

thejmazz commented 8 years ago

This discussion is largely resolved, and implementation details will be discussed in new issues. Closing for now