Closed satra closed 4 years ago
in one of the workflows i'm currently testing, i was able to do wf.mo2.split(('val', 'val2')).combine('val')
and it appeared to do the right thing as i see in the final output.
It's really weird that the workflow works some times but not others. @satra just want to be sure what's the correct output we should expect from wf.mo2.split(('val', 'val2')).combine(???)
?
For the task code in the first post, the output for both task.split(('val', 'val2'))
and task.split(('val', 'val2')).combine('val2')
is
[({'mo2.val': 0, 'mo2.val2': 4},
Result(output=Output(b=[0, 0], a=4), runtime=None, errored=False)),
({'mo2.val': 1, 'mo2.val2': 5},
Result(output=Output(b=[1, 1], a=5), runtime=None, errored=False)),
({'mo2.val': 2, 'mo2.val2': 6},
Result(output=Output(b=[2, 2], a=6), runtime=None, errored=False))]
In a workflow, wf.mo2.split('val', 'val2')
gives:
({'wf.val': [0, 1, 2], 'wf.val2': [4, 5, 6]},
Result(output=Output(b=[[0, 0], [1, 1], [2, 2]], a=[[4, 5, 6], [4, 5, 6], [4, 5, 6]]), runtime=None, errored=False))
Is the goal to make the output from wf.mo2.split('val', 'val2')
the same as the one from task.split(('val', 'val2'))
?
Currently wf.mo2.split('val', 'val2').combine('val2')
gives this error:
---------------------------------------------------------------------------
PydraStateError Traceback (most recent call last)
~/Desktop/nlo/pydra/pydra/engine/core.py in _run(self, submitter, rerun, **kwargs)
896 if self.task_rerun and self.propagate_rerun:
--> 897 task.task_rerun = self.task_rerun
898 # if the task is a wf, than the propagate_rerun should be also set
~/Desktop/nlo/pydra/pydra/engine/core.py in _run_task(self, submitter, rerun)
914 self.hooks.pre_run_task(self)
--> 915 try:
916 self.audit.monitor()
~/Desktop/nlo/pydra/pydra/engine/submitter.py in _run_workflow(self, wf, rerun)
171 else:
--> 172 for fut in await self.submit(task, rerun=rerun):
173 task_futures.add(fut)
~/Desktop/nlo/pydra/pydra/engine/submitter.py in submit(self, runnable, wait, rerun)
107 if runnable.state:
--> 108 runnable.state.prepare_states(runnable.inputs)
109 runnable.state.prepare_inputs()
~/Desktop/nlo/pydra/pydra/engine/state.py in prepare_states(self, inputs, cont_dim)
524 self.splitter_validation()
--> 525 self.combiner_validation()
526 self.set_input_groups()
~/Desktop/nlo/pydra/pydra/engine/state.py in combiner_validation(self)
509 if set(self._combiner) - set(self.splitter_rpn):
--> 510 raise hlpst.PydraStateError("all combiners have to be in the splitter")
511
PydraStateError: all combiners have to be in the splitter
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
<ipython-input-85-09a1e2a88326> in <module>
9
10 with pydra.Submitter(plugin="cf") as sub:
---> 11 sub(runnable=wf)
12
13 wf.result(return_inputs=True)
~/Desktop/nlo/pydra/pydra/engine/submitter.py in __call__(self, runnable, cache_locations, rerun)
55 ]
56 if is_workflow(runnable) and runnable.state is None:
---> 57 self.loop.run_until_complete(self.submit_workflow(runnable, rerun=rerun))
58 else:
59 self.loop.run_until_complete(self.submit(runnable, wait=True, rerun=rerun))
~/anaconda3/envs/pydra/lib/python3.7/asyncio/base_events.py in run_until_complete(self, future)
581 raise RuntimeError('Event loop stopped before Future completed.')
582
--> 583 return future.result()
584
585 def stop(self):
~/Desktop/nlo/pydra/pydra/engine/submitter.py in submit_workflow(self, workflow, rerun)
70 await self.worker.run_el(workflow, rerun=rerun)
71 else:
---> 72 await workflow._run(self, rerun=rerun)
73 else: # could be a tuple with paths to pickle files wiith tasks and inputs
74 ind, wf_main_pkl, wf_orig = workflow
~/Desktop/nlo/pydra/pydra/engine/core.py in _run(self, submitter, rerun, **kwargs)
904 self.hooks.pre_run(self)
905 with SoftFileLock(lockfile):
--> 906 # # Let only one equivalent process run
907 odir = self.output_dir
908 if not self.can_resume and odir.exists():
~/Desktop/nlo/pydra/pydra/engine/helpers.py in save(task_path, result, task, name_prefix)
129 if task_path.name.startswith("Workflow"):
130 # copy files to the workflow directory
--> 131 result = copyfile_workflow(wf_path=task_path, result=result)
132 with (task_path / f"{name_prefix}_result.pklz").open("wb") as fp:
133 cp.dump(result, fp)
~/Desktop/nlo/pydra/pydra/engine/helpers.py in copyfile_workflow(wf_path, result)
139 def copyfile_workflow(wf_path, result):
140 """ if file in the wf results, the file will be copied to the workflow directory"""
--> 141 for field in attr_fields(result.output):
142 value = getattr(result.output, field.name)
143 new_value = _copyfile_single_value(wf_path=wf_path, value=value)
~/Desktop/nlo/pydra/pydra/engine/specs.py in attr_fields(x)
6
7 def attr_fields(x):
----> 8 return x.__attrs_attrs__
9
10
AttributeError: 'NoneType' object has no attribute '__attrs_attrs__'
Currently wf.mo2.split('val', 'val2').combine('val2') gives this error:
@nicolocin - in the above call there has to be an extra parenthesis:
wf.mo2.split(('val', 'val2')).combine('val2')
i would expect that to work. if that still doesn't it may be a function of checking if all the places where the task object is stored inside a workflow is the same object.
@satra - missed the parenthesis. Both wf.mo2.split(('val', 'val2'))
and wf.mo2.split(('val', 'val2')).combine('val2')
gives:
({'wf.val': [0, 1, 2], 'wf.val2': [4, 5, 6]},
Result(output=Output(b=[[0, 0], [1, 1], [2, 2]], a=[4, 5, 6]), runtime=None, errored=False))
So still not the same. I'll check where the task is stored
i believe this is fixed with #303. if not, please reopen
can't seem to come up with a syntax that doesn't generate an error (this has to be run in the context of a workflow).
in the context of a task
task code
```python import pydra import typing as ty import os @pydra.mark.task @pydra.mark.annotate({"return": {"b": ty.Any, "a": ty.Any}}) def test_multiout(val, val2): return [val, val], val2 if __name__ == "__main__": cache_dir = os.path.join(os.getcwd(), 'cache3') task = test_multiout(name="mo2", val=[0, 1, 2], val2=[4, 5, 6], cache_dir=cache_dir) task.split(('val', 'val2')).combine('val2') with pydra.Submitter(plugin="cf", n_procs=2) as sub: sub(runnable=task) results = task.result(return_inputs=True) print(results) ```the above code will run, but not combine the results