nipype / pydra

Pydra Dataflow Engine
https://nipype.github.io/pydra/
Other
120 stars 59 forks source link

AttributeError: 'Input' object has no attribute 'files_hash' with split and combine #675

Open ghisvail opened 1 year ago

ghisvail commented 1 year ago

I am trying to split a simple workflow consisting of 3 sequential steps (bias correction, registration and resampling) on multiple T1w images in a BIDS dataset. The code is provided below and consists of 3 ANTs tasks with a custom reader task attached upfront.

The workflow fetches the files fine, processes the bias correction step in parallel but then fails with a rather generic AttributeError: 'Input' object has no attribute 'files_hash'. The code listing and full traceback are available below.

It is worth noting that the workflow runs fine if I provide a reader fetching a single file (read_one_bidsfile) and remove the split and combine steps. It also works if I provide the list of files manually to the workflow and don't use a reader task.

Code:

@task
@annotate({"return": {"T1w": list[str]}})
def read_bids_dataset(dataset_path: PathLike):
    """Read all T1w files from a BIDS dataset."""

    return sorted(Path(dataset_path).rglob("*_T1w.nii.gz"))

@task
@annotate({"return": {"T1w": str}})
def read_one_bidsfile(dataset_path: PathLike):
    """Read one T1w file from a BIDS dataset."""

    return next(Path(dataset_path).rglob("*_T1w.nii.gz"))

def t1_linear(**kwargs) -> Workflow:
    from pydra.tasks import ants

    workflow = Workflow(name=name, input_spec=["dataset_path", "mni_template", "ref_template"], **kwargs)

    # workflow.add(read_one_bidsfile(name="reader", dataset_path=workflow.lzin.dataset_path))
    workflow.add(read_bids_dataset(name="reader", dataset_path=workflow.lzin.dataset_path))

    workflow.add(
        ants.N4BiasFieldCorrection(name="bias_correction", input_image=workflow.reader.lzout.T1w))

    workflow.bias_correction.split("input_image")

    workflow.add(
        ants.RegistrationSyNQuick(
            name="registration",
            dimensionality=3,
            transform_type="a",
            moving_image=workflow.bias_correction.lzout.output_image,
            fixed_image=workflow.lzin.mni_template,
        )
    )

    workflow.add(
        ants.ApplyTransforms(
            name="transform",
            dimensionality=3,
            moving_image=workflow.registration.lzout.warped_moving_image,
            fixed_image=workflow.lzin.ref_template,
            output_datatype="short",
        )
    )

    workflow.combine("bias_correction.input_image")

    workflow.set_output({
        "output_image": workflow.transform.lzout.output_image,
        "output_transform": workflow.registration.lzout.affine_transform,
    })

    return workflow

Traceback:

Traceback (most recent call last):
  File "/Users/ghislain.vaillant/Projects/pydra-ants/test.py", line 238, in <module>
    sub(wf)
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 42, in __call__
    self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
  File "/Users/ghislain.vaillant/.asdf/installs/python/3.11.3/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 68, in submit_from_call
    await runnable._run(self, rerun=rerun)
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 1124, in _run
    await self._run_task(submitter, rerun=rerun)
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 1152, in _run_task
    await submitter.expand_workflow(self, rerun=rerun)
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 195, in expand_workflow
    tasks, follow_err = get_runnable_tasks(graph_copy)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 236, in get_runnable_tasks
    _is_runnable = is_runnable(graph, tsk)
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/submitter.py", line 266, in is_runnable
    is_done = pred.done
              ^^^^^^^^^
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 229, in __getattr__
    return self.__getattribute__(name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 667, in done
    _result = self.result()
              ^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 746, in result
    checksum = self.checksum_states(state_index=ind)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/core.py", line 285, in checksum_states
    setattr(inputs_copy, key.split(".")[1], val)
  File "/Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/specs.py", line 105, in __setattr__
    self.files_hash[name] = {}
    ^^^^^^^^^^^^^^^
AttributeError: 'Input' object has no attribute 'files_hash'
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<ConcurrentFuturesWorker.exec_as_coro() running at /Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/workers.py:175> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Users/ghislain.vaillant/.asdf/installs/python/3.11.3/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<ConcurrentFuturesWorker.exec_as_coro() running at /Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/workers.py:175> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Users/ghislain.vaillant/.asdf/installs/python/3.11.3/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at /Users/ghislain.vaillant/Projects/pydra-ants/.hatch/pydra-ants/lib/python3.11/site-packages/pydra/engine/workers.py:169> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /Users/ghislain.vaillant/.asdf/installs/python/3.11.3/lib/python3.11/asyncio/futures.py:387, Task.task_wakeup()]>>
tclose commented 1 year ago

This will likely be "addressed" by #662 , because file-hash caching has been temporarily dropped in it. See #683 for plans to reimplement it

ghisvail commented 1 year ago

@tclose I tried re-running this script with current, but got the following error:

File ".../pydra/pydra/engine/core.py", line 605, in split
    raise ValueError(
ValueError: Split is missing values for the following fields ['input_image']
tclose commented 1 year ago

Hi @ghisvail, this is the main syntactical change in the PR. You now need to pass the inputs to be split to the split() method, i.e.


    workflow.add(
        ants.N4BiasFieldCorrection(name="bias_correction", input_image=workflow.reader.lzout.T1w))

    workflow.bias_correction.split("input_image")

becomes


    workflow.add(
        ants.N4BiasFieldCorrection(name="bias_correction"))

    workflow.bias_correction.split("input_image", input_image=workflow.reader.lzout.T1w)

NB: you can drop the "splitter" in this case and just use workflow.bias_correction.split(input_image=workflow.reader.lzout.T1w)

ghisvail commented 1 year ago

@tclose thanks for the fix. I am now getting the following error:

  File ".../pydra/engine/submitter.py", line 75, in submit_from_call
    if runnable.state is None:
       ^^^^^^^^^^^^^^
AttributeError: type object 'Workflow' has no attribute 'state'

from the following workflow submission:

import sys

wf = t1_linear(name="t1-linear", dataset_path=sys.argv[1])                                                                                     

with Submitter() as sub:                                             
    res = sub(wf)
    print(res) 

I tried with and without the split and combine semantics, both yield the same error.