nipy / nipype

Workflows and interfaces for neuroimaging packages
https://nipype.readthedocs.org/en/latest/
Other
741 stars 524 forks source link

Discuss: Nipype boilerplate simplification/ v2 API #2539

Open satra opened 6 years ago

satra commented 6 years ago

Summary

When one writes a nipype interface or workflow there is a fair bit of boilerplate. The intent of this discussion is to figure out how we reduce/simplify this, and to think about the v2 api.

should we:

  1. just simplify syntax (allow more restricted forms only)/update our examples to use more compact pythonic forms. for example:
# common form
a.inputs.one = 2
a.inputs.two = 3

# alternate allowed forms
a  = Interface(one=2, two=3)
or
a = Interface(...)
a.inputs.update(one=2, two=3)

except for special interfaces (e.g., Function)

  1. add support for lazy evaluation in new api and make it more compact (my current preference):
wf2=Workflow('wf12, ..., distribute=False, global_cache=False)
...
wf1=Workflow('wf1', ..., distribute=True)
wf1.add('a', BET(...))
wf1.add('wf2', wf2(..., inputs='node2.in1=a.out1'))
wf1.add('b', antsRegistration(..., in1='a.out1')).map('in1'))
wf1.add('x', SomeFunc(..., in1='b.out1')) # map automatically continues
wf1.add('c', ImageMaths(..., in1='wf2.node1.out1')).join('b.in1')
wf1.run()

or more compactly:

wf2=Workflow('wf2', ..., distribute=False, global_cache=False)
...
wf1=Workflow('wf1', ..., distribute=True)
    .add('a', BET(...))
    .add('wf2', wf2(..., inputs='node2.in1=a.out1'))
    .add('b', antsRegistration(..., in1='a.out1')).map('in1'))
    .add('x', SomeFunc(..., in1='b.out1')) # map automatically continues
    .add('c', ImageMaths(..., in1='wf2.node1.out1')).join('b.in1')
wf1.run()

.add will return a composite reference to both the workflow and the interface added

  1. create a DSL (domain specific language - like WDL/CWL) instead of using Python

@djarecka @Shotgunosine @effigies @chrisfilo @oesteban - open for discussion.

satra commented 6 years ago

instead of 3, we should also be able to do:

wf1 = Workflow(...).from_cwl('/path/to/cwl')

would use the cwl library to parse the file and convert to a nipype workflow.

and correspondingly wf1.to_cwl()/boutiques()

satra commented 6 years ago

also for map and join, i'm thinking we could do something like:

  1. map .add('b', antsRegistration(..., in1='a.out1')).map('in1', propagate_failures=True))

True can be default with False, failure_callback being options.

  1. join

.add('c', ImageMaths(..., in1='wf2.node1.out1')).join('b.in1', failure_mode='allow_any')

options could be allow_any | allow_none, we could mark failed outputs as failed or xfail or nan or None, and for map processing we can allow failures to propagate

effigies commented 6 years ago

How about this as a Python-based DSL?

class Workflow1(Workflow):
    a = BET(...)
    wf2 = Workflow2(..., inputs={'node2.in1': a.out1})
    b = Map(antsRegistration(..., in1=a.out1), 'in1', propagate_failures=True)
    x = SomeFunc(..., in1=b.out1)
    c = Join(ImageMaths(..., in1=wf2.node1.out1), 'b.in1', failure_mode='allow_any')

wf1 = Workflow1()
wf1.run()

This seems like it would have to be something of syntactic sugar, and __init__ would need to do the work of turning these class variables into independent instance variables. I'm not sure if there's something in your proposed syntax that would be very difficult to translate into this approach.

satra commented 6 years ago

@effigies - i can live with that and construct a graph from this.

oesteban commented 6 years ago

I feel that @effigies' suggestion would also allow to easily set up conditional nodes and other decision making within the definition of the Workflow

satra commented 6 years ago

@oesteban - indeed I was about to bring up conditionals and loops. I'm not sure how that would fit in to the dsl yet. We should flesh that piece out before settling on the API.

Shotgunosine commented 6 years ago

@effigies How much overhead is that going to add to interface creation?

effigies commented 6 years ago

@satra I would be interested in conditionals and loops in the format suggested in (2), as well. And MapNode, as well. Would that just be .map(...).join(...), and thus Join(Map(...), ...) in mine?

Also, it would be good to translate a real workflow - that uses iterables, JoinNode and MapNode - into each syntax. If I can suggest one from fmriprep: init_bold_surf_wf.

@Shotgunosine Overhead in what sense?

Shotgunosine commented 6 years ago

I mean when you say that init would need to do the work of turning the class variables into instance variables. Is that work that people adding interfaces need to do?

leej3 commented 6 years ago

My two cents: I really like the compact @satra outlines. I'm a big fan of functional programming so it seems very natural to me. I think that would be a pleasing format to teach to a complete beginner too though.

Shotgunosine commented 6 years ago

I do like the compact form of 2 above as well.

effigies commented 6 years ago

Oh, I see. No, I meant that we would need to code the base Workflow class with an __init__ that makes a usable workflow from the declarative style above.

That said, often workflow setup does need to be dynamically adjusted at runtime, which could be handled like so:

class MyWorkflow(Workflow):
    ...
    def __init__(self, ...):
        super(MyWorkflow, self).__init__(self, ...)
        # Workflow is fully initialized, and any runtime
        # additions/modifications can be done here

We could also approach my syntax as sugar on top of form 2, so post-initializing setup could be done outside the workflow itself:

wf = MyWorkflow(distribute=True)
if inputs_look_like_this:
    wf.add('y', SomeOtherNode(...))
else:
    wf.add('z', YetAnother(...))

To be clear, I think form 2 looks reasonable, and I only arrived at my approach by looking at it and thinking what could be cut further. This replaces (perhaps optionally?) wf.add('b', node) with wf.b = node, which will naturally result in more work behind the scenes. (Hopefully not too much.)

satra commented 6 years ago

@effigies - i'll try to apply @djarecka's api + this discussion to init_bold_surf_wf

satra commented 6 years ago

here is an attempt. there are several details that need to be sorted out, but something like this should work in principle.

wf = Workflow(name, mem_gb_node=DEFAULT_MEMORY_MIN_GB,
              inputs=['source_file', 't1_preproc', 'subject_id',
                      'subjects_dir', 't1_2_fsnative_forward_transform',
                      'mem_gb', 'output_spaces', 'medial_surface_nan'],
              outputs='surfaces')

@interface
def select_target(subject_id, space):
    """ Given a source subject ID and a target space, get the target subject ID """
    return subject_id if space == 'fsnative' else space

wf.add('targets', select_target(subject_id=wf.inputs.subject_id))
  .map('space', space=[space for space in wf.inputs.output_spaces
                       if space.startswith('fs')])
wf.add('rename_src', Rename(format_string='%(subject)s',
                            keep_ext=True,
                            in_file=wf.inputs.source_file))
  .map('subject')
wf.add('resampling_xfm',
       fs.utils.LTAConvert(in_lta='identity.nofile',
                           out_lta=True,
                           source_file=wf.inputs.source_file,
                           target_file=wf.inputs.t1_preproc)
  .add('set_xfm_source', ConcatenateLTA(out_type='RAS2RAS',
                                        in_lta2=wf.inputs.t1_2_fsnative_forward_transform,
                                        in_lta1=wf.resampling_xfm.out_lta))
wf.add('sampler',
       fs.SampleToSurface(sampling_method='average', sampling_range=(0, 1, 0.2),
                          sampling_units='frac', interp_method='trilinear',
                          cortex_mask=True, override_reg_subj=True,
                          out_type='gii',
                          subjects_dir=wf.inputs.subjects_dir,
                          subject_id=wf.inputs.subject_id,
                          reg_file=wf.set_xfm_source.out_file,
                          target_subject=wf.targets.out,
                          source_file=wf.rename_src.out_file),
        mem_gb=mem_gb * 3)
       .map([('source_file', 'target_subject'), 'hemi'], hemi=['lh', 'rh'])

wf.add_cond('cond1', 
            condition=wf.inputs.medial_surface_nan,
            iftrue=wf.add('medial_nans', MedialNaNs(subjects_dir=wf.inputs.subjects_dir,
                                             in_file=wf.sampler.out_file,
                                             target_subject=wf.targets.out))
                     .set_output('out', wf.median_nans.out),
            elseclause=wf.set_output('out', wf.sampler.out_file))

wf.add('merger', niu.Merge(1, ravel_inputs=True,
                           in1=wf.cond1.out),
            run_without_submitting=True)
      .join('sampler.hemi')

wf.add('update_metadata',
       GiftiSetAnatomicalStructure(in_file=wf.merger.out))
wf.outputs.surfaces = wf.update_metadata.out_file
chrisgorgo commented 6 years ago

I believe we already support 1), I like 2), and I agree that support for other established DSL would be nice.

On a more general note I am afraid the inner circle of nipype veterans might have limited insight into everyday user struggles. I wonder if we could set up a survey and/or virtual focus groups to figure out what are the pain points for most users.

djarecka commented 6 years ago

I personally like the Satra's suggestions 2, don't see too much difference between compact or the original version.

I'm struggling to see the workflow structure in the DSL example.

satra commented 6 years ago

@chrisfilo - i like the idea of doing a survey. it's been a while since we did something like that. how about we brainstorm a little bit internally (although publicly), and propose a working draft. i do want to borrow some ideas from tensorflow, which may allow a more pythonic style.

I'm struggling to see the workflow structure from DSL example.

@djarecka - could you please clarify what you mean by that?

effigies commented 6 years ago

Here's my approach, which is admittedly not very clear on how Map and Join will work. I treat Join(Map(..., 'name')) as the equivalent of MapNode over 'name'. I also dropped the conditional for now, though I see you've proposed a syntax.

class BOLDSurfaceWorkflow(pe.Workflow):
    inputs = Inputs(source_file=File(exists=True),
                    t1_preproc=File(exists=True),
                    subject_id=traits.Str(),
                    subjects_dir=Directory(exists=True)
                    t1_2_fsnative_forward_transform=File(exists=True))

    @interface
    def select_target(subject_id, space):
        """ Given a source subject ID and a target space, get the target subject ID """
        return subject_id if space == 'fsnative' else space

    targets = Join(Map(select_target(subject_id=inputs.subject_id), 'space'))

    rename_src = Join(Map(Rename(format_string='%(subject)s', keep_ext=True,
                                 in_file=inputs.source_file), 'subject'))

    resampling_xfm = LTAConvert(in_lta='identity.nofile', out_lta=True,
                                source_file=inputs.source_file,
                                target_file=inputs.t1_preproc)
    set_xfm_source = ConcatenateLTA(out_type='RAS2RAS', in_lta1=resampling_xfm.out_lta,
                                    in_lta2=inputs.t1_2_fsnative_forward_transform)

    sampler = Join(
        Map(
            SampleToSurface(sampling_method='average', sampling_range=(0, 1, 0.2),
                            sampling_units='frac', interp_method='trilinear',
                            cortex_mask=True, override_reg_subj=True, out_type='gii',
                            subjects_dir=inputs.subjects_dir, subject_id=inputs.subject_id,
                            reg_file=set_xfm_source.out_file, target_subject=targets.out,
                            source_file=rename_src.out_file),
            [('source_file', 'target_subject'), 'hemi'], hemi=['lh', 'rh']),
        'source_file')

    merger = Join(Merge(1, ravel_inputs=True, in1=sampler.out_file), 'sampler.hemi')

    update_metadata = Join(
        Map(GiftiSetAnatomicalStructure(in_file=merger.out), 'in_file'),
        'in_file')

    outputs = Outputs(surfaces=update_metadata.out_file)

    def __init__(self, mem_gb, output_spaces, medial_surface_nan,
                 name='bold_surf_wf', *args, **kwargs):
        super(BOLDSurfaceWorkflow, self).__init__(name, *args, **kwargs)

        spaces = [space for space in output_spaces if space.startswith('fs')]
        self.targets.space = spaces
        self.rename_src.subject = spaces
djarecka commented 6 years ago

@satra - well, this was just my subjective impression reading the syntax. But in general, I like this additional piece of syntax that tells you to connect or add.

I was a bit afraid that in DSL syntax, you can put a lot of completely unrelated calculations without harm to the syntax, I mean it would still work, but won't be readable. If you have this explicit links (e.g. connect, add) you think more that this should be one workflow.

effigies commented 6 years ago

Some thoughts on branching. This is super provisional.

While loops

I expect the body of a loop would be most easily thought of as a sub-workflow:

class Iterable(Workflow):
    inputs = Inputs(['a', 'b', 'c', 'condition'])
    node1 = ...
    node2 = ...
    outputs = Outputs(['a', 'b', 'c', 'condition'])

(Certainly one could imagine an interface with the outputs being a superset of the inputs and including a condition, so we don't need to limit ourselves to a workflow.)

We could then introduce While, similar to Map and Join, that would take a field that it checks as True or False. e.g.

class MyWorkflow(Workflow):
    refine = While(Iterable(...), 'condition', lambda x, tol=0.0001: x < tol)

We might also want to consider some syntax like '~criterion' to loop on bool(criterion) is False instead of True.

To translate this into @satra's 2-form:

my_wf.add('refine', Iterable(...)).while('condition')

There are two approaches to the output of a While loops that I can immediately think of. The first would be similar to tail-call recursion, in that you use each output to seed the next iteration, but the output of the loop is just the output of the final iteration. The second would be similar to a MapNode, where each output in the output spec is expanded into a list, and there's an entry in the list corresponding to each iteration. I suspect the former would be more useful, but figured I'd bring up the issue in case it sparks ideas.

Conditional

I see @satra above suggested:

wf.add_cond('cond1', 
            condition=wf.inputs.condition,
            iftrue=wf.<operation>,
            elseclause=wf.<operation>)

This is a pretty different approach, as it involves manipulating the workflow structure during workflow-runtime. To be clear, I'm not rejecting this idea, but it's not how I've been thinking about workflows. I think I would again talk in terms of a conditional sub-workflow (or node), but instead of a loop which requires a state, it's a simple on-off. To make it an if-else, you would embed two conditional sub-workflows:

class Either(Workflow):
    inputs = Inputs(['a', 'b', 'c'], condition=traits.Bool)
    left = WorkflowA(in1=inputs.a, in2=inputs.b, enable=inputs.condition)
    right = WorkflowB(in1=inputs.a, in2=inputs.c, enable=not inputs.condition)
    outputs = Outputs(out1=ifelse(inputs.condition, left.out1, right.out1),
                      out2=ifelse(inputs.condition, left.out2, right.out2))

To adapt this to @satra's notation, I might render this as:

wf.add('left', WorkflowA(...), enable=inputs.condition)
wf.add('right', WorkflowB(...), enable=not inputs.condition)
wf.add('final_out', Identity(out1=ifelse(inputs.condition, 'left.out1'), ...)

Recursion

Given this approach for if/else, if we are constructing workflows lazily, we can create a loop in a more directly recursive idiom:

class Iterate(Workflow):
    inputs = Inputs(['a', 'b'], enable=traits.Bool)
    body = WorkflowBody(a=inputs.a, b=inputs.b)
    iterate = Iterate(a=body.out1, b=body.out2, enable=not body.completed)
    outputs = Outputs(out1=ifelse(body.completed, body.out1, iterate.out1),
                      out2=ifelse(body.completed, body.out2, iterate.out2))

I'm not sure that this is at all a good idea, but seems worth considering.

effigies commented 6 years ago

@chrisfilo I agree that getting feedback on the difficult points of the learning curve would be good.

I am curious though whether surveys or focus groups are considered particularly effective ways of discovering these, in organizations that have the resources to regularly research UX. In my experience, nipype is somewhat like git, in that it's hard to put your finger on what's difficult to learn as you learn it, and it's hard to retrospectively say what was difficult as well. But perhaps others are better at introspection.

Nipype workshops (and to a lesser extent, Brainhacks) seem like a good place to get feedback. You have the opportunity for one-on-one training, which gives the more experienced user a chance to observe the sticking points, and can finish the workshop with a focus group where you explicitly ask for user assessment of the difficulties. (Do we have any such data from previous workshops?)

Anyway, I think it's probably a good idea to distribute a survey, too.

oesteban commented 6 years ago

One thing I would also like to note: for nipype 2.0 we agreed that subworkflows are also nodes, correct?

From outside, a node wrapping a workflow and a node wrapping an Interface should be fundamentally the same, shouldn't they?

effigies commented 6 years ago

That's my understanding.

oesteban commented 6 years ago

Okay, so in all examples above pe.Workflow is just another type derived from BaseInterface right?

effigies commented 6 years ago

Ah, yeah, I didn't mean to leave pe. in. I intended it to be somewhat agnostic as to exactly how workflows, nodes, and interfaces are distinguished. My goal is mostly to argue about syntax.

tetron commented 6 years ago

@satra On the topic of executing CWL on Nipype, you might be interested in the CWL support in Toil (https://github.com/BD2KGenomics/toil/blob/master/src/toil/cwl/cwltoil.py) and Airflow (https://github.com/Barski-lab/cwl-airflow/) both of which are workflow engines with a Python API. They use cwltool to load the document, then convert to the native workflow graph and execute the graph. This only requires a few hundred lines of code.

effigies commented 6 years ago

Also on the topic of CWL, how do they represent conditionals and loops, if they do? It may be good to provide a familiar syntax.

oesteban commented 6 years ago

Moving away from the workflow perspective - for the Nipype 2.0 I would very much like to see a couple of features inter-related:

Opinions?

effigies commented 6 years ago

Interfaces implement caching. Cache can be dismissed with a force argument in the run() member (I don't mind that it was True by default).

I think this is reasonable, especially given that interfaces can't generally be reused. And if we do successfully merge interfaces and nodes...

Which is actually a question I have: We've had a useful functional difference between interfaces and nodes, which is that the interface makes no attempt to change the behavior of the wrapped functionality, while the node works to keep the effects of the interface contained. What's the proposed way of achieving this dual functionality in 2.0, or is the un-wrapped interface to be a thing of the past?

Interfaces fulfill the Delayed interface. If we don't feel comfortable with such integration, at least they should be/should wrap Future objects.

I do like something like Delayed, as one feature I would really like is to pick a node to run and only run its dependencies, to get the result as fast as possible. I feel like going down the dask route should be kind of all-or-nothing, in that if we start implementing their interfaces, it makes sense for the nipype workflow to be a dask graph. So we should either commit to essentially a dask-based rebuild, or relegate dask to a plugin that most of nipype is agnostic towards.

satra commented 6 years ago

Interfaces implement caching. Cache can be dismissed with a force argument in the run() member (I don't mind that it was True by default).

this depends on whether we merge nodes and interfaces into a common base. perhaps to satisfy the my interface should run like my command line, we can use cache=False to change behavior. this will also require some internal reworkings that are not there at this point.

however, i would be willing to consider unwrapped interfaces a thing of the past. perhaps it would be good to hear other opinions on this.

Interfaces fulfill the Delayed interface. If we don't feel comfortable with such integration, at least they should be/should wrap Future objects.

i think everything in nipype is along those lines (like tf.Placeholder), but we should make it more explicit. we will have to figure out if the interfaces need a decorator (as in dask/tf) or this can be implicitly created.

On related boilerplate notes, @djarecka and i discussed two operations that the node/workflow will have: JoinByKey and Group/ReduceByKeyValue. i think the basics of this are almost implemented, so we should be able to create the draft engine relatively soon.

satra commented 6 years ago

@oesteban - going back to your earlier question about nodes and workflows

class Node/BaseInterface:
    pass

class Workflow(Node):
    pass
djarecka commented 6 years ago

@effigies regarding loops in CWL, I'm using Scatter method. I think they're working on conditionals, if you want to learn more, you can check google group chat

kaczmarj commented 6 years ago

i like @satra's second example in the first comment. it's similar to keras's sequential model api.

i was going to suggest something similar to keras's functional model api, but the sequential-model-like api is easier to use.