pyiron / pyiron_workflow

Graph-and-node based workflows
BSD 3-Clause "New" or "Revised" License
10 stars 1 forks source link

What happens when we pickle a failed flow? #371

Open liamhuber opened 6 days ago

liamhuber commented 6 days ago

@jan-janssen, this was a good question this morning. Behaviour wound up as I expected/hoped:

import pickle

from pyiron_workflow import Workflow

@Workflow.wrap.as_function_node()
def Fail(x):
    raise RuntimeError("I am supposed to fail")
    return x

wf = Workflow("saving_failures")
wf.inp = Workflow.create.standard.UserInput(42)
wf.plus1 = wf.inp + 1
wf.fail_here = Fail(wf.plus1)
wf.something_downstream = wf.fail_here + 1

try:
    wf()
except RuntimeError:
    pass

reloaded = pickle.loads(pickle.dumps(wf))

print(reloaded.ready)
>>> False
# You can look at the full .readiness_report and the problem is it is in the "failed" state
# Running again would require manually setting failed to False
print(wf.plus1.outputs.add.value)
>>> 43
print(wf.fail_here.inputs.x.value)
>>> 43
print(wf.fail_here.outputs.x.value)
>>> NOT_DATA

i.e. everything up to the failure point can be (un)pickled just fine, and everything after that still has default values (NOT_DATA in this case).

If the example is clear and the behaviour is acceptable go ahead and close and this is just an FYI issue; otherwise let me know any changed behaviour that would be helpful.

jan-janssen commented 5 days ago

The scenario I am most curious about is the execution on a remote node in an HPC context. Let's say I want to submit either the whole workflow or a part of the workflow to a remote system and then it fails, because the DFT code does not converge. Can I take a part of the workflow graph and execute it somewhere else and afterwards return the executed part? I am just wondering how flexible the serialisation is and how resistant it is to failed executions.

liamhuber commented 5 days ago

I think there is less implemented than you have in mind. The only built-in interaction with HPC resources comes from the NodeJob wrapper; this only runs a single node, not one that lives inside some larger workflow. I'm not sure what the failure behaviour is as I don't have an HPC environment to test it with, it could be that the failed node still serializes and gets returned, but I can't guarantee it.

Otherwise the pickling has some flexibility to it as __getattr__ de-parents the node, so you are free to pickle some subgraph, although you'll unavoidably get everything downstream of your pickling point:

import pickle

from pyiron_workflow import Workflow

wf = Workflow("pickle_part")
wf.inp = Workflow.create.standard.UserInput(42)
wf.loop = Workflow.create.for_node(
    Workflow.create.standard.Add,
    iter_on=("other",),
    obj = wf.inp,
    other = [1, 2, 3, 4]
)
wf()

just_the_loop = pickle.loads(pickle.dumps(wf.loop))

all(just_the_loop.outputs.df.value == wf.loop.outputs.df.value)
>>> True

There is currently no failure resilience built in -- if an error is encountered it gets raised. We could change that in the future, but right now it is the intended behaviour. In the context of submitting an entire python script to HPC, this still gives some flexibility, as you can catch exceptions and do something about them and/or finally save regardless of whether or not there was a failure. When we implement a direct interface to HPC submission, this sort of "save at the end regardless of how it went" seems like a good idea to me.

In the example below I just use the bytestreams to demonstrate manual failure recovery, but you could of course save to file and reload the file later to the same effect:

import pickle

from pyiron_workflow import Workflow

wf = Workflow("pickle_part")
wf.inp = Workflow.create.standard.UserInput(42)
wf.loop = Workflow.create.for_node(
    Workflow.create.standard.Add,
    iter_on=("other",),
    obj = wf.inp,
    other = ["will not add", 2, 3, 4]
)
try:
    wf()
except TypeError:
    print("Failed to add an int and string")
finally:
    saved = pickle.dumps(wf)
>>> Failed to add an int and string

loaded = pickle.loads(saved)

# The for-loop runs body nodes in reverse order
# So everything until this last one executes just fine and saves its output
print(loaded.loop.body_1.outputs.add.value)
>>> 44

# The run failed
# There is no sexy syntax for re-setting the failed state
# So do it manually
loaded.failed = False
loaded.loop.failed = False

# Fix the input and rerun
loaded.loop.inputs.other = [100, 2, 3, 4]
rerun_output = loaded.loop.run()

print(rerun_output.df["add"].values)
>>> [142  44  45  46]

# or we could just as easily go from the top:
# loaded.inputs.loop__other = [100, 2, 3, 4]
# loaded.run()

Of course this reruns the entire for-loop, but if caching via input hashing was implemented and applied to this body node, nodes 1-3 would all see the same input and return their loaded output just fine. (Having the for-loop continue to other items after a failure is also more or less the plan, then we could have put the "will node add" input in any position and still have gotten all the rest of the nodes to execute, but it's a low-priority feature.)

p.s. thank you for the question! Going through and generating the examples uncovered two more bug in (de)serialization of the for-loop #372 #373