nipype / pydra

Pydra Dataflow Engine
https://nipype.github.io/pydra/
Other
120 stars 59 forks source link

Intermittent workflow failures (>=0.21?) #622

Open tclose opened 1 year ago

tclose commented 1 year ago

What version of Pydra are you using? 0.21 What were you trying to do/What did you expect will happen? run a workflow successfully What actually happened? workflow intermittently fails with error

message:
E                         Traceback (most recent call last):
E                           File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/core.py", line 1124, in _run
E                             await self._run_task(submitter, rerun=rerun)
E                           File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/core.py", line 1152, in _run_task
E                             await submitter.expand_workflow(self, rerun=rerun)
E                           File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/submitter.py", line 190, in expand_workflow
E                             await task._run(self, rerun=rerun)
E                           File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/core.py", line 1124, in _run
E                             await self._run_task(submitter, rerun=rerun)
E                           File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/core.py", line 1152, in _run_task
E                             await submitter.expand_workflow(self, rerun=rerun)
E                           File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/submitter.py", line 171, in expand_workflow
E                             blocked = _list_blocked_tasks(graph_copy)
E                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E                           File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/submitter.py", line 310, in _list_blocked_tasks
E                             blocking.append(pred, ", ".join(matching_name))
E                         TypeError: list.append() takes exactly one argument (2 given)

Can you replicate the behavior? If yes, how? Not easily. See https://github.com/ArcanaFramework/arcana-xnat/actions/runs/4278435805/jobs/7448157567 for example where the workflow fails in one iteration of the matrix but not the other. Note that exactly the same code base passed successfully just moments later, https://github.com/ArcanaFramework/arcana-xnat/actions/runs/4278436386, so it is likely do to the workflow graph being executed in different orders between runs.

I have fixed my local dev version to 0.20, which seems to avoid the issue. So I was wondering whether anything might have changed between 0.20 and 0.21 that could lead to these sort of errors.

I was having a similarly intermittent problem with 0.21 where the result of a executing a Workflow was being returned as None instead of pydra.engine.specs.Result, which I'm suspecting is related.

effigies commented 1 year ago

https://github.com/nipype/pydra/commit/7fd0848fec5c9d16eede8fde6ae4e0050f02593f

tclose commented 1 year ago

Actually, looking at that error message L310 should never have worked.

tclose commented 1 year ago

7fd0848

Ah, it was me!

tclose commented 1 year ago

Need a proper test of that error message, sorry.

However, it looks like that error is covering an underlying error retrieving the runnable tasks...

tclose commented 1 year ago

See #623 for a simple fix of this issue.

The traceback on the underlying error (with error message) is

inputs:
          id: timepoint0group0member0

      errors:
          time of crash: 20230303-100710
          login name: root
          name with checksum: Workflow_a8c69ddab63a91f1698e0aae1cd36fdb4cdae58d9f1f154f07d38abc818b66fb
          message:
              Traceback (most recent call last):
                File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/core.py", line 1124, in _run
                  await self._run_task(submitter, rerun=rerun)
                File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/core.py", line 1152, in _run_task
                  await submitter.expand_workflow(self, rerun=rerun)
                File "/opt/miniconda-latest/envs/arcana/lib/python3.11/site-packages/pydra/engine/submitter.py", line 173, in expand_workflow
                  raise Exception(
              Exception: graph is not empty, but not able to get more tasks - something may have gone wrong when retrieving the results of predecessor tasks caused by a file-system error or a bug in the internal workflow logic.

              Blocked tasks
              -------------

              pipeline_task (FunctionTask_007ff189b15bce50691cf936ae64e1be62fa4e9418c6458f814ad710dc2ae63d) is blocked by input_interface (FunctionTask_e80a6caf1b082c2638eff11972ac1d44c615c6cc1d7db8afd1a70cf0f8514125), which matches names of [input_interface (FunctionTask_9adecde18300004e0d914b09374f318486190dac61b6f2435d680869d1cca9dd), input_interface (FunctionTask_9adecde18300004e0d914b09374f318486190dac61b6f2435d680869d1cca9dd), input_interface (FunctionTask_9adecde18300004e0d914b09374f318486190dac61b6f2435d680869d1cca9dd), input_interface (FunctionTask_9adecde18300004e0d914b09374f318486190dac61b6f2435d680869d1cca9dd)]

              output_interface (FunctionTask_7760739ce9cc173915b0a17b2df11ec07ea3da8147ef8630779ab6c4a67f1750) is blocked by pipeline_task (FunctionTask_007ff189b15bce50691cf936ae64e1be62fa4e9418c6458f814ad710dc2ae63d), which matches names of [pipeline_task (FunctionTask_9227a6a573a7dce081e92cd946277a467ca6e66357c10a5b6c2343efc0e2ecaf), pipeline_task (FunctionTask_9227a6a573a7dce081e92cd946277a467ca6e66357c10a5b6c2343efc0e2ecaf), pipeline_task (FunctionTask_9227a6a573a7dce081e92cd946277a467ca6e66357c10a5b6c2343efc0e2ecaf), pipeline_task (FunctionTask_9227a6a573a7dce081e92cd946277a467ca6e66357c10a5b6c2343efc0e2ecaf)]

              sink (FunctionTask_294f551109fbd13b7b859e4afc20812b75c678f60ebe51180c1c191a10862b7b) is blocked by output_interface (FunctionTask_7760739ce9cc173915b0a17b2df11ec07ea3da8147ef8630779ab6c4a67f1750), which matches names of [output_interface (FunctionTask_e8ba917ff3bd69c5a2df29faa8e5a877b9974cc62431b53c31c99712adadf74b), output_interface (FunctionTask_e8ba917ff3bd69c5a2df29faa8e5a877b9974cc62431b53c31c99712adadf74b), output_interface (FunctionTask_e8ba917ff3bd69c5a2df29faa8e5a877b9974cc62431b53c31c99712adadf74b), output_interface (FunctionTask_e8ba917ff3bd69c5a2df29faa8e5a877b9974cc62431b53c31c99712adadf74b)]

which suggests the hash of the upstream nodes are being altered somehow (probably deep in a nested object). The fact that all the tasks in the simple workflow are being altered is interesting. Not sure why this error would be happening intermittently, feel like it is going to be pretty hard to track down...

effigies commented 1 year ago

Is there a set involved? Could be an item ordering issue.

tclose commented 1 year ago

Not sure. There is quite a bit involved. The fact that it is every node in the workflow is puzzling me though as they wouldn't all share a common input...

In general, seeing as though I have hit this problem twice now, I'm wondering whether relying on the hash not changing is a bit brittle and it wouldn't be a better idea to cache the hash when the workflow graph is generated. You could also use this cache to check to see when the hash is changing and raise an error

tclose commented 1 year ago

Is there a set involved? Could be an item ordering issue.

Come to think of it, the problem reoccurred after I switched to using fileformats, which stores the fspaths in each FileSet in a frozenset.

djarecka commented 1 year ago

i don't see anything obvious between 0.20 and 0.21 that could explain the issue...

effigies commented 1 year ago

I don't know its underlying implementation, but if a frozenset wraps a set, then its element ordering is determined by PYTHONHASHSEED. So it can be consistent within a process, but can't be assumed between processes. If the Python interpreter on the remote end of a call can't be assumed to be identical to the current one, it's possible that even setting PYTHONHASHSEED would not be valid.

Insertion order is probably not desirable, either, as I would hope hash(myset([a, b])) == hash(myset([b, a])).

tclose commented 1 year ago

i don't see anything obvious between 0.20 and 0.21 that could explain the issue...

I could be wrong about that. I started noticing the issue around the time I switched from 0.20 to 0.21 but it was around the time I switched to fileformats so that makes more sense

tclose commented 1 year ago

I don't know its underlying implementation, but if a frozenset wraps a set, then its element ordering is determined by PYTHONHASHSEED. So it can be consistent within a process, but can't be assumed between processes. If the Python interpreter on the remote end of a call can't be assumed to be identical to the current one, it's possible that even setting PYTHONHASHSEED would not be valid.

Insertion order is probably not desirable, either, as I would hope hash(myset([a, b])) == hash(myset([b, a])).

So if that is the issue I shouldn't see this problem with the serial plugin, just the cf, which kind of matches with what I'm experiencing. I usually use serial in my unittests, but not in the "app execution" routine, which is what is failing at the moment

tclose commented 1 year ago

I will explicitly implement the __hash__ method on FileSet and see if that fixes the problem

tclose commented 1 year ago

Would still leave it as a bit of a vulnerability with the engine as a whole if you can't use inputs containing sets

effigies commented 1 year ago

Yeah. I don't remember how we decide what we're hashing in types, but it would be good to make sure that we return a consistent view. If we're hashing __reduce__, it could be as simple as:

class stableset(frozenset):
    def __reduce__(self):
        return (self.__class__, tuple(sorted(self, key=hash)))

Edit: Well, no. We can't sort based on objects hash()es, since python -c "print(hash('a'))" is different every time. We need to be able to pass the hashing function through.

tclose commented 1 year ago

Wouldn't

def __hash__(self):
    return hash(hash(i) for i in sorted(self))

work?

tclose commented 1 year ago

Well it would at least for FileSets which have Path items, but I suppose you couldn't use that in general

effigies commented 1 year ago

That assumes that the objects in self are sortable. Not everything that is hashable is sortable, and vice versa. For example:

In [6]: hash((set([0, 1, 2]), set([0, 1]), set([0]), set([1])))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[6], line 1
----> 1 hash((set([0, 1, 2]), set([0, 1]), set([0]), set([1])))

TypeError: unhashable type: 'set'

In [7]: sorted((set([0, 1, 2]), set([0, 1]), set([0]), set([1])))
Out[7]: [{0}, {1}, {0, 1}, {0, 1, 2}]
tclose commented 1 year ago

Taking a different approach, couldn't we just set the hashseed globally across all worker nodes?

tclose commented 1 year ago

https://gerrychain.readthedocs.io/en/latest/topics/reproducibility.html just suggests setting it to 0

effigies commented 1 year ago

Yes, but then we still get a different hash in the coordinator thread, since we can't control what PYTHONHASHSEED the base interpreter was started with.

tclose commented 1 year ago

Can we set them to whatever the hash seed is for the base interpreter is when creating the processes?

tclose commented 1 year ago

This thread seems to suggest that multiprocessing processes are passed the same hash seed

https://stackoverflow.com/questions/52044045/os-environment-variable-reading-in-a-spawned-process

tclose commented 1 year ago

For more distributed execution plugins, could we write the hash seed to a file somewhere in the cache directory and make sure it is set in the environment when the worker is loaded?

effigies commented 1 year ago

That would probably kill any hope for Windows support, since I don't think fork() is implemented there.

Another approach could be to detect this case and raise an error saying "You may be using unstable data structures, such as set(). Consider using an order-preserving variant, such as pyrsistent's PSet or boltons' IndexedSet".

effigies commented 1 year ago

Can we set them to whatever the hash seed is for the base interpreter is when creating the processes?

Also, no. Because PYTHONHASHSEED is not inspectable if it was not set.

tclose commented 1 year ago

That would work for sets passed directly to inputs, but if they are internal to the object being passed, then we would have to iterate through all the nested members

tclose commented 1 year ago

Also, no. Because PYTHONHASHSEED is not inspectable if it was not set.

That's annoying

effigies commented 1 year ago

That would work for sets passed directly to inputs, but if they are internal to the object being passed, then we would have to iterate through all the nested members

I just mean we detect the hash change and infer that there must be an unstable type somewhere in the spec. If people don't know what is in the types they're using, that only goes so far, but it's better than nothing.

tclose commented 1 year ago

Explicitly hashing FileSet resolved the error I was having. Will have to wait and see if it reoccurs

tclose commented 1 year ago

That would work for sets passed directly to inputs, but if they are internal to the object being passed, then we would have to iterate through all the nested members

I just mean we detect the hash change and infer that there must be an unstable type somewhere in the spec. If people don't know what is in the types they're using, that only goes so far, but it's better than nothing.

I could just amend the error message in my PR to say something along those lines

tclose commented 1 year ago

Explicitly hashing FileSet resolved the error I was having. Will have to wait and see if it reoccurs

Actually, the error did reoccur, but I think I didn't succeed in setting up the manual hash. https://github.com/nipype/pydra/issues/626 should fix the hash. Maybe we can look at it during the hack week