Closed tclose closed 1 year ago
Patch coverage: 100.00
% and project coverage change: +0.54
:tada:
Comparison is base (
b5fe4c0
) 81.17% compared to head (fb1803b
) 81.71%.:exclamation: Current head fb1803b differs from pull request most recent head c8a2b96. Consider uploading reports for the commit c8a2b96 to get more accurate results
:umbrella: View full report at Codecov.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.
Is it possible to add a test that exercises this branch?
I've been trying to think how to do that. How would I create a workflow with a broken execution graph, where some of the nodes are unrunnable? (I can't remember the exact case that motivated me to write that error explanation code)
Is it possible to create a loop like nodeA -> nodeB -> nodeA?
I tried this
@mark.task
def add_together(x, y):
return x + y
def test_wf_with_blocked_tasks(plugin_dask_opt, tmpdir):
wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"])
wf.add(add_together(name="taska", x=wf.lzin.x))
wf.add(add_together(name="taskb", x=wf.taska.lzout.out, y=wf.taska.lzout.out))
# Create a cyclic loop
wf.taska.y = wf.taskb.lzout.out
wf.set_output([("out", wf.taskb.lzout.out)])
wf.inputs.x = [1]
wf.cache_dir = tmpdir
# with pytest.raises(Exception) as exc:
with Submitter(plugin=plugin_dask_opt) as sub:
sub(wf)
but it produces the following traceback instead
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pydra/engine/submitter.py:42: in __call__
self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
pydra/engine/submitter.py:68: in submit_from_call
await runnable._run(self, rerun=rerun)
pydra/engine/core.py:1125: in _run
result.output = self._collect_outputs()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pydra.engine.core.Workflow object at 0x10eaf4ca0>
def _collect_outputs(self):
output_klass = make_klass(self.output_spec)
output = output_klass(**{f.name: None for f in attr.fields(output_klass)})
# collecting outputs from tasks
output_wf = {}
for name, val in self._connections:
if not isinstance(val, LazyField):
raise ValueError("all connections must be lazy")
try:
val_out = val.get_value(self)
output_wf[name] = val_out
except (ValueError, AttributeError):
output_wf[name] = None
# checking if the tasks has predecessors that raises error
if isinstance(getattr(self, val.name)._errored, list):
> raise ValueError(
f"Tasks {getattr(self, val.name)._errored} raised an error"
)
E ValueError: Tasks ['taska'] raised an error
@tclose - do you remember if the changes indeed helped at the time you had the issue?
@djarecka The changes were what I used to track down and debug the issue I was having. It was something like one node failing and it not getting picked up and then downstream nodes sitting there blocked, but I can't remember what exactly now
I expect it would help a lot to debug the intermittent failures I'm experiencing atm too
Why is codecov complaining the coverage drops by 5% when I only changed two lines that aren't getting hit?
Why is codecov complaining the coverage drops by 5% when I only changed two lines that aren't getting hit?
Because codecov is garbage.
sorry, but it's still not completely clear to me how this helps with debugging, since you're commenting get_runnable_tasks(graph_copy)
and suggesting to "Uncomment to debug".
sorry, but it's still not completely clear to me how this helps with debugging, since you're commenting
get_runnable_tasks(graph_copy)
and suggesting to "Uncomment to debug".
sorry, that isn't very clear. It is just that that exception is triggered by get_runnable_tasks
returning no runnable tasks, so if you uncomment that line put a breakpoint there you can step through and find out what the problem is. I forgot to take it out before making my previous PR, but thought it might be useful to keep it there as a reference. Happy to take it out though
I've tried a few ways to hit this branch but keep getting stuck. If we merge these changes, I should be able to have a better idea of what the issue that is causing this branch to be hit in my code and then could try to design a test around that.
Ah, I remember now. I fixed this bug in my local branch and managed to hit the issue again. It is caused by the hash of the task being altered at some point in the workflow (somewhere deep in a nested object I imagine) and then the downstream node not being able to find the predecessor node at the hash it expects to.
The error message helped a lot :) Still not sure how to generate a test for it easily though.
Do we coerce inputs to a different type at runtime? That might change the hash.
Do we coerce inputs to a different type at runtime? That might change the hash.
I don't remember seeing that. I'm passing around fairly deep objects, which was not really what I expect Pydra was designed for, so I ran into trouble before where one of nested objects was not being copied between nodes and a downstream node updated the object that was used to generate the upstream nodes hash.
Using the insight (see #622) that this error is caused by hashes of node inputs being unstable, I have managed to come up with a test to hit this error.
Seems as though this is something that needs a bit deeper thought though, as relying on hashes not changing is a bit of a weakness.
@djarecka @effigies , I reckon this is ready to merge now. Not sure why the SLURM test is failing. If you go to Codecov, the changes actually increase the coverage by 0.55% not reduce them
(Moved from an inline comment because I realized it grew out of hand:)
Interesting reproduction. Here's a thought for how we could change this to make this use case smoother:
User writes a function that modifies its inputs. After execution, the runner re-checks the input hash and raises an error on change, rather than waiting for a workflow deadlock. The user then re-marks with @mark.unsafe_task
, which causes the inputs to be deepcopy
ed before execution. The input hash remains unchanged, and the workflow continues.
This doesn't resolve the hash nondeterminism. That's caused by https://github.com/nipype/pydra/blob/b5fe4c0eb7f937e70db15bc087d86fe90f401ff3/pydra/engine/helpers.py#L672-L674
Given that it's based on the string representation, you could probably resolve that on your end by sorting the set in __str__
or __repr__
.
That said, there's a StackOverflow on this exact problem, and the answer is basically that there's no way to hash arbitrary objects. But we could use functools.singledispatch
to pass this problem off to users:
@functools.singledispatch
def hash_obj(obj: object) -> bytes:
# Works for generic objects with __dict__
dict_rep = ":".join(":".join(key, hash_obj(val)) for key, val in obj.__dict__.items())
return sha256(f"{obj.__class__}:{dict_rep}".encode()).hexdigest()
# Then define something for builtin classes with no __dict__
@hash_obj.register
def _(obj: int) -> bytes: ...
@hash_obj.register
def _(obj: str) -> bytes: ...
@hash_obj.register
def _(obj: dict) -> bytes: ...
We could even handle things like set()
and frozenset()
by default, and so only types that were really obscure would need to register their own. And we could either just let them decorate:
@pydra.util.hash_obj.register
def _(obj: File) -> bytes:
...
or provide a registration function
pydra.utils.register_hash(File, myhashfun)
yes, this is interesting case. I think I was assuming that input should not changed and I did not include tests that do it... I like the approach @effigies is suggesting: checking the input after execution.
As for the issue with hashes, I've already started adding some extra rules for specific subjects, e.g. for the numpy arrays, after I had issues. But giving option to the user might ba a good solution (this probably can be in a different PR)
This issue raises plenty of good points. I noticed these two:
Support for passing arbitrary types between Pydra tasks.
Support for "unsafe" functions modifying their inputs.
In my experience, 1) should be avoided as much as possible to simplify hashing and pickling between processes, as well as ensuring better task composition. I believe tasks and workflows should be considered the same way as public functions in a library: use rich types internally to model the domain, expose primitive types externally to limit coupling. I like @effigies idea of registering a hashing handler in the worst case scenario.
I am a bit confused as to why we would want to support 2).
I am a bit confused as to why we would want to support 2).
Not so much support as make it safe to work with those sorts of functions. IIRC, the initial goal of the @pydra.mark
module was to make it easy to work with functions from external libraries, and it became the standard way of writing FunctionTask
s because it's nicer than using the object directly because it preserves docstrings. So I would consider wrapping unsafe functions to become safe tasks in-scope.
In any event, if someone has an unsafe function, they're going to need to either rewrite or wrap it themselves, or we can provide a standard tool with straightforward semantics:
def wrap_unsafe(func):
@wraps(func)
def wrapper(*args, **kwargs):
return func(*deepcopy(args), **deepcopy(kwargs))
return wrapper
So we could write:
safe_task = task(wrap_unsafe(unsafe_func))
And if somebody has control over the function but is just lazy, they could write:
@task
@wrap_unsafe
def f(x):
x.a = 2
return x
Created #625 and #626. I reproduced my own posts, not people's responses. Please copy your comments where appropriate.
Types of changes
Summary
Fixes bug described in https://github.com/nipype/pydra/issues/622
Checklist