bionode / bionode-watermill

💧Bionode-Watermill: A (Not Yet Streaming) Workflow Engine
https://bionode.gitbooks.io/bionode-watermill/content/
MIT License
37 stars 11 forks source link

Handle External Inputs #95

Open thejmazz opened 6 years ago

thejmazz commented 6 years ago

"External" inputs being referred to here as those which come from the canonical inputs.json (though it can be named anything).

See the CWL Generic Execution Process

  1. Load, process and validate a CWL document, yielding a process object.
  2. Load input object.
  3. Validate the input object against the inputs schema for the process.
  4. Validate process requirements are met.
  5. Perform any further setup required by the specific process type.
  6. Execute the process.
  7. Capture results of process execution into the output object.
  8. Validate the output object against the outputs schema for the process.
  9. Report the output object to the process caller.

See also InitialWorkDirRequirement. For the first implementation of this we can ignore this requirement and assume all inputs should be staged. We will need functionality to parse basic CWL CommandLineTool documents without any requirements or hints or inline expressions before approaching this.

In the current Watermill implementation, a task (not yet a CWL CommandLineTool) looks like this, for example:

const samtoolsIndex = task({
  input: 'reads.bam',
  output: '*.bam.bai',
  name: 'samtools index'
}, ({ input }) => `samtools index ${input}`)

The corresponding CWL CommandLineTool could look like:

cwlVersion: v1.0
class: CommandLineTool
baseCommand: [ samtools, index ]

requirements:
  InitialWorkDirRequirement:
    listing:
      - $(inputs.src)

inputs:
  reads:
    type: File
    inputBinding:
      position: 1
      valueFrom: $(self.basename)

outputs:
  classfile:
    type: File
    outputBinding:
      # use $(input.reads.basename) or something to be extra specific?
      glob: "*.bam.bai"

And the corresponding inputs.json:

{
  "reads": "/data/myreads.bam"
}

It should result in a call like (from inside ./data/<task-uid>)

samtools index myreads.bam

So, as a first step, watermill needs to handle resolving input files from an inputs file, not just from outputs of other steps, or the CWD of the node process which called the pipeline (perhaps the latter is not even permitted in CWL spec).

The rough lifecycle would look like:

  1. Pipeline is parsed, store.tasks has a list of tasks.
  2. Read in values of input.json into the store at state.externalInputs. Store the original external inputs, as well as resolved values (resolved using CWD of node process).
  3. Within the task lifecycle input resolution, if a key of the task inputs matches a key from the external inputs, resolve to that
  4. Use existing symlink functionality to stage that input (in addition to the others which are resolved from outputs of previous tasks), or also add new copying functionality (probably better to leave that to a separate PR)
  5. Rest of task lifecycle should continue as normal past the resolveInput step.

Basically, need to modify resolveInput to also handle values from state.externalInputs. Also what is needed is a way to pass path of inputs.json to watermill as a CLI flag - or for now (or at least, before finalizing PR) this can be hardcoded to inputs.json which is a sibling to pipeline.js.

it is also useful to store things like the CWD that was used when resolving, makes it easy to check why a certain input did not resolve, since we can go and look at which optons where used by inspecting state

so the reducer for external inputs will need to handle

  1. the original, unchanged value of inputs.json
  2. the set of options/state (like CWD) used to resolve the files
  3. the resolved version

so these would go under state.original, state.resolved perhaps, and then that reducer is added to store with combineReducers, so the end topology would be like store.state.externalInputs.original

Imagine if we were not using redux, where would this data go? probably into the property of a class. and then that instance of that class would need to make available to every task, so it can check. with redux, we just put it into state, and then can read out values later, and we only need to pass around one thing everywhere: the store. This is a better use case than storing task state in redux, since in this case the state (resolved external inputs) needs to be consumed by multiple "components" (tasks) - whereas with tasks, that state is really only needed by that task, so it would have been OK to keep it in a class - but we use redux anyways there too for 1) consistency and 2) automatic transparency (no need to go and a bunch of console logs to a Task class, instead just enable redux logging and inspect).

As a test case, can do end-to-end test for a pipeline that uses external inputs, and check if it completes with the expected output (can be simple cat ${input} > output.txt).

Spec considerations: