nipy / nipype

Workflows and interfaces for neuroimaging packages
https://nipype.readthedocs.org/en/latest/
Other
746 stars 529 forks source link

How to parallelize the diffusion preprocessing pipeline #2044

Closed anbai106 closed 6 years ago

anbai106 commented 7 years ago

Hello: I have a general question for the preprocessing of diffusion, I think right now the pipeline works in sequence, but if I want to parallelize the pipeline and without touching your souce code, can I just create a MapNode to distribute the subjects and connect it to the workflow, e.g.

create_dmri_preprocessing

in fsl/epi interface.

I tried it like

distribute_node = npe.Node(nutil.IdentityInterface(name="Distributesubjects", interface=nutil.IdentityInterface( fields=['in_dwi', 'in_bval', 'in_bvec', 'in_fmap_magnitude', 'in_fmap_phasediff'], mandatory_inputs=True)), name='b0_dwi_split') full_pipe = create_dmri_preprocessing()

I think if we put the pipeline in parallelized, it will be more computation-saved.

Thanks in advance

Hao

effigies commented 7 years ago

I haven't looked up the create_dmri_preprocessing interface, but I'll assume that your distributenode fields are equivalent to its inputnode.

I would do it something like this:

meta_wf = npe.Workflow(name='meta_wf')
# See: https://nipype.readthedocs.io/en/latest/interfaces/generated/nipype.interfaces.io.html#selectfiles
fetch_inputs = npe.Node(nio.SelectFiles(...), name='fetch_inputs', iterables=['subject_id'])
fetch_inputs.inputs.subject_id = subject_list

dmri_wf = create_dmri_preprocessing(name='dmri_wf')

meta_wf.connect([
    (fetch_inputs, dmri_wf, [('in_dwi', 'in_dwi'),
                             ('in_bval', 'in_bval'),
                             ('in_bvec', 'in_bvec'),
                             ('in_fmap_magnitude', 'in_fmap_magniutde'),
                             ('in_fmap_phasediff', 'in_fmap_phasediff'),
                             ])])

meta_wf.run(plugin='MultiProc')

You'll presumably also want some kind of datasink, as well.

anbai106 commented 7 years ago

@effigies Thanks for your response, actually I used iterables before, the reason why I did not take it is that when the iterables have just one inputs like you mentioned here 'subject_id', it works perfectly, but if for example, i have iterables like this: read_parameters_node.iterables = [('in_dwi', self.subjects), ('in_bval', read_parameters_node.inputs.in_bval), ('in_bvec', read_parameters_node.inputs.in_bvec), ('in_fmap_magnitude',read_parameters_node.inputs.in_fmap_magnitude), ('in_fmap_phasediff', read_parameters_node.inputs.in_fmap_phasediff)]

All the inputs will be mixed, for example, all the dwi, fieldmap files taken by the Node is not from the same subject, do you have any idea or option to take the same sequence for the files in iterables???

A bad example is given here: ignore_exception = False in_bval = ['/Users/junhao.wen/ownCloud/Dossier_echange_Hao/NIFTI-PREVDEMALS/Paris/sub-PREVDEMALS0010004PM/ses-M0/dwi/sub-PREVDEMALS0010004PM_ses-M0_dwi.bval'] in_bvec = ['/Users/junhao.wen/ownCloud/Dossier_echange_Hao/NIFTI-PREVDEMALS/Paris/sub-PREVDEMALS0010011RV/ses-M0/dwi/sub-PREVDEMALS0010011RV_ses-M0_dwi.bvec'] in_dwi = ['sub-PREVDEMALS0010004PM'] in_fmap_magnitude = ['/Users/junhao.wen/ownCloud/Dossier_echange_Hao/NIFTI-PREVDEMALS/Paris/sub-PREVDEMALS0010004PM/ses-M0/fmap/sub-PREVDEMALS0010004PM_ses-M0_magnitude1.nii.gz'] in_fmap_phasediff = ['/Users/junhao.wen/ownCloud/Dossier_echange_Hao/NIFTI-PREVDEMALS/Paris/sub-PREVDEMALS0010011RV/ses-M0/fmap/sub-PREVDEMALS0010011RV_ses-M0_phasediff.nii.gz']

To update, With synchronize flag, this issue can be solved

Thanks in advance