nipype / pydra

Pydra Dataflow Engine
https://nipype.github.io/pydra/
Other
120 stars 59 forks source link

Mix ShellCommand and Function tasks in a workflow #617

Open rcorredorj opened 1 year ago

rcorredorj commented 1 year ago

Hi guys,

I'm exploring pydra after a recommendation of @oesteban, as seems to be the next version of nypipe. I feel the documentation is still a bit weak, but I'm checking whether I can use it in my pipelines. In particular, I'm trying to understand how to mix both shell/container tasks and function tasks in a workflow.

The example I'm trying to run is a workflow that runs elastix (image registration tool) as an executable. The exe receives as input the paths of three files, plus the path of a directory where results are saved. When this executable finishes, I would like to provide as input to another task (either ShellCommand or a Function task) the path of one of the files produced in the output folder.

I started like this (following code), but still I don't know how to connect properly the shellcommand task so this is not working. How do I provide the arguments to the command task when a inputspec is provided in the creation of the task?

import os
import attr
import pydra
import tempfile
import nest_asyncio

nest_asyncio.apply()

def pipeline(img1_file_path, img2_file_path, out_dir_path):
    wf1 = pydra.Workflow(name='wf1', input_spec=['img1_file_path', 'img2_file_path', 'out_dir_path'],
                         img1_file_path=img1_file_path,
                         img2_file_path=img2_file_path,
                         out_dir_path=out_dir_path)

    tmp_reg1_out_dir_path = tempfile.mkdtemp()

    elastix_bin_path = 'elastix.exe'
    elastix_input_spec = pydra.specs.SpecInfo(
        name='Input',
        fields=[('fixed_img_file_path', attr.ib(type=str, metadata={'help_string': 'text','argstr': '-f', 'mandatory': True},),),
                ('moving_img_file_path', attr.ib(type=str, metadata={'help_string': 'text','argstr': '-m', 'mandatory': True},),),
                ('reg_params_file_path', attr.ib(type=str, metadata={'help_string': 'text','argstr': '-p', 'mandatory': True},),),
                ('out_dir_path', attr.ib(type=str, metadata={'help_string': 'text','argstr': '-out', 'mandatory': True},),),],
        bases=(pydra.specs.ShellSpec,),
    )

    elastix_output_spec = pydra.specs.SpecInfo(
        name='Output',
        fields=[('reg_img', attr.ib(type=pydra.specs.File, metadata={'help_string ': 'out', 'output_file_template': os.path.join("{out_dir_path}", 'result.0.nii.gz')},),),
                 ('transf_params', attr.ib(type=pydra.specs.File, metadata={'help_string ': 'out_params',
                                                                      'output_file_template': os.path.join(
                                                                          "{out_dir_path}", 'TransformParameters.0.nii.gz')},),)],
        bases=(pydra.specs.ShellOutSpec,),
    )

    reg_img1_to_img2 = pydra.ShellCommandTask(name='reg_img1_to_img2 ', executable=elastix_bin_path,
                                  input_spec=elastix_input_spec,
                                  output_spec=elastix_output_spec)

    wf1.add(reg_img1_to_img2 (fixed_img_file_path=wf1.lzin.img2_file_path,
                                moving_img_file_path=wf1.lzin.img1_file_path,
                                out_dir_path=tmp_reg1_out_dir_path,
                                ))

    with pydra.Submitter(plugin='cf') as sub:
        sub(wf1)

    wf1.result()

Thanks in advance!

RaC

rcorredorj commented 1 year ago

I managed to make it work in a similar way as in the pydra-fsl repo (https://github.com/nipype/pydra-fsl) by creating a class that inherits from ShellCommandTask and defining customized input and output specs. I think this should be described as an option in the tutorials, feels more intuitive for the rest of the procedure.

I'm copying the class below, but now I'm getting an error when I try to get the outputs from these tasks in the workflow. The error I'm getting:

  File "D:\Miniconda3\envs\pipeline\lib\site-packages\pydra\engine\core.py", line 1125, in _run
    result.output = self._collect_outputs()
  File "D:\envs\pipeline\lib\site-packages\pydra\engine\core.py", line 1232, in _collect_outputs
    raise ValueError(
ValueError: Task reg_img_to_img raised an error, full crash report is here: C:\Users\myuser\AppData\Local\Temp\tmph6ycgbbs\Elastix_b45a77f2c16d81b89fa65c3e2131c276c7d00c9d529fb83f3e56d067cea94e31\_error.pklz

I can confirm that the processes are triggered properly and the results are generated in the temp folders, but the _collect_outputs() function is returning None.

The class I created looks as follows.

elastix_input_fields = [
    (
        "fixed_img_file_path",
        pydra.specs.File,
        {
            'help_string': 'text', 'argstr': '-f {fixed_img_file_path}', 'mandatory': True,
        },
    ),
    (
        "moving_img_file_path",
        pydra.specs.File,
        {
            'help_string': 'text', 'argstr': '-m {moving_img_file_path}', 'mandatory': True,
        },
    ),
    (
        "out_dir_path",
        pydra.specs.Directory,
        {
            'help_string': 'text', 'argstr': '-out {out_dir_path}', 'mandatory': True,
        },
    ),
    (
        "reg_params_file_path",
        pydra.specs.File,
        {
            'help_string': 'text', 'argstr': '-p {reg_params_file_path}', 'mandatory': True,
        },
    ),
]

elastix_input_specs = pydra.specs.SpecInfo(
    name="Input", fields=elastix_input_fields, bases=(pydra.specs.ShellSpec,)
)

def out_files(out_dir_path):
    print(out_dir_path)
    return os.listdir(out_dir_path)

elastix_output_fields = [
    (
        "reg_img_file",
        pydra.specs.File,
        {
            "help_string": "",
            "requires": ["fixed_img_file_path", "moving_img_file_path", "out_dir_path"],
            "output_file_template": "{out_dir_path}/result.0.nii.gz",
        },
    ),
    (
        "transf_params_file",
        pydra.specs.File,
        {
            "help_string": "",
            "requires": ["fixed_img_file_path", "moving_img_file_path", "out_dir_path"],
            "output_file_template": "{out_dir_path}/TransformParameters.0.txt",
        },
    ),
    (
        "out_files",
        list,
        {
            "help_string": (
                "all output files in a list"
            ),
            "callable": out_files,
        },
    ),
]

elastix_output_specs = pydra.specs.SpecInfo(
    name="Output", fields=elastix_output_fields, bases=(pydra.specs.ShellOutSpec,)
)

class Elastix(pydra.ShellCommandTask):
    def __init__(
        self,
        audit_flags: AuditFlag = AuditFlag.NONE,
        cache_dir=None,
        input_spec=None,
        messenger_args=None,
        messengers=None,
        name=None,
        output_spec=None,
        rerun=False,
        strip=False,
        **kwargs
    ):
        elastix_bin_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'bin', 'elastix', 'v5_0_1',
                                        'elastix.exe')

        if input_spec is None:
            input_spec = elastix_input_specs
        if output_spec is None:
            output_spec = elastix_output_specs
        if name is None:
            name = "Elastix"
        super().__init__(
            name=name,
            input_spec=input_spec,
            output_spec=output_spec,
            audit_flags=audit_flags,
            messengers=messengers,
            messenger_args=messenger_args,
            cache_dir=cache_dir,
            strip=strip,
            rerun=rerun,
            executable=elastix_bin_path,
            **kwargs
        )
satra commented 1 year ago

you may want to take a look at this: https://nipype.github.io/pydra/components.html#workflows specifically the set_output line.

satra commented 1 year ago

also do take a look at the latest iterations of FSL interfaces as they simplify almost everything into the specs, and in many cases even output spec may not be required depending on the command being wrapped.

af-a commented 1 year ago

Hi @rcorredorj,

I am trying to do something very similar, i.e. creating a workflow of ShellCommandTasks and FunctionTasks, but I am also running into issues just adding a ShellCommandTask to a workflow.

I am curious as to how you were able to add the task in your example without getting an error, as in this line that you shared:

    wf1.add(reg_img1_to_img2 (fixed_img_file_path=wf1.lzin.img2_file_path,
                                moving_img_file_path=wf1.lzin.img1_file_path,
                                out_dir_path=tmp_reg1_out_dir_path,
                                ))

For me, this would raise:

AttributeError: 'Result' object has no attribute 'name'

Looking into it, I figured that ShellCommandTask(...) (as in the example) returns a pydra.engine.specs.Result object, whereas a FunctionTask(...) returns a pydra.engine.task.FunctionTask (i.e. a TaskBase object). Since Workflow.add() expects the latter, it should always raise the error if you add a shell task (while a FunctonTask works fine). Is it not the case for you?

rcorredorj commented 1 year ago

Hi @AhmedFaisal95, I still have troubles to make it work properly. In general, I manage to create a workflow with two instances of the ShellCommandTask running in parallel, both tasks are triggered but one ends with an error :

File "D:\..\pipeline\lib\site-packages\pydra\engine\core.py", line 1232, in _collect_outputs
    raise ValueError(
ValueError: Task reg_img1_to_img2 raised an error, full crash report is here: C:\Users\user\AppData\Local\Temp\tmp0byd0a0w\Elastix_6c558c162d748a81ed8a8f3d207038c3d9a0f626d00016482370f03c410d0421\_error.pklz

The results of both instances are available, the executables end properly, but it fails and indicates the temp folder indicated above that does not even exist at that point in runtime.

I'm struggling a bit unfortunately to use pydra as I was expecting. Will give a last try by checking the workflows defined for FSL as indicated by @satra.

If you have any suggestions from your end, keep me posted.

Thanks!

RaC

af-a commented 1 year ago

@rcorredorj

You seem to be getting further than I can, then.

In your case, I'm guessing that your reg_img1_to_img2 may be responsible here for raising some error, although its results are available as expected?

The temp file mentioned in the crash report is created by pydra after the failure and should contain at least the stderr output of your presumably failed command, so it may contain hints about the cause of the error.

djarecka commented 1 year ago

@rcorredorj - I'm sorry for delays. Starting from the original questions - the input spec is created before creating the task, but you can assign value to the specific inputs later, e.g. when adding the tasks to the workflow, see example here. Does this make sense?