riga / law

Build large-scale task workflows: luigi + job submission + remote targets + environment sandboxing using Docker/Singularity
http://law.readthedocs.io
BSD 3-Clause "New" or "Revised" License
98 stars 41 forks source link

Proposal: high-level workflows #131

Closed lmoureaux closed 1 year ago

lmoureaux commented 1 year ago

I've been using Law for a few months now, and for some time now I've been thinking about a new API that I think would fit quite well within Law.

The traditional Alphabet example fits my use case quite well, except I would like to generate alphabets in lowercase and uppercase. This is relatively easy to achieve with itertools.product:

import itertools as it
class CreateChars(Task, law.LocalWorkflow):
    def create_branch_map(self):
        return {i: data for i, data in enumerate(it.product([0x41, 0x61], range(26)))}

    def output(self):
        offset, num = self.branch_data
        return self.local_target(f"output_{offset}_{num}.json")

    def run(self):
        offset, num = self.branch_data
        char = chr(offset + num)
        output = self.output()
        output.dump({"num": num, "char": char})

This workflow can create both alphabets in parallel, which is good if we have the compute. The code also has some weaknesses:

Instead I think it would be nicer to have something like this:

class CreateChars(Task, law.LocalWorkflow):
    uppercase = law.WorkflowParameter(luigi.BoolParameter(), [True, False])
    num = law.WorkflowParameter(luigi.IntParameter(), range(26))

    def output(self):
        return self.local_target(f"output_{self.uppercase}_{self.num}.json")

    def run(self):
        offset = 0x41 if self.uppercase else 0x61
        char = chr(offset + self.num)
        output = self.output()
        output.dump({"num": self.num, "char": char})

This would do the same as above. However, the interface is a lot nicer:

CreateChars()  # A-Za-z
CreateChars(uppercase=True)  # A-Z
CreateChars(uppercase=True, num=0)  # A
CreateChars(uppercase=True, num=[0, 1])  # AB
CreateChars(num=[0, 1])  # ABab

Some semantic details:

I didn't try to implement this, but I think it should be feasible. Before I dive in, I'd like to gather some feedback about the overall proposal and any implementation issue you'd see. And also where to put this in the code ^^

riga commented 1 year ago

Hi @lmoureaux ! Thanks for the proposal, this idea could indeed solve some level of "inconvenience" regarding the forced access via branch numbers whose meaning might not always be known from outside the task where the branch map is defined. I have to think about some implementation details and constraints from the job submission side and get back to you 😀

riga commented 1 year ago

I think we can indeed implement this type of workflow definition (which actually only translates into a convenient branch lookup, but more on this at the end). We should use this issue to discuss and define the interface, and the implementation (which might be rather trivial in the end) can happen later.

Just some thoughts first.

  1. In your example, you added a bool flag as a "second axis" for the values that would make up the branch map. Convenient access and readability aside, these easy cases could of course always be broken down to a normal workflow by adding a normal parameter that specializes the workflow. In this sense, law run CreateChars --uppercase True --branch ... would simply trigger a different workflow than law run CreateChars --uppercase False --branch .... And actually, this is exactly leading to the behavior in your second example and I think you forgot the case CreateChars(uppercase=[True, False], num=0) which is not covered by just adding a normal uppercase parameter 😉 I'm just writing that for completeness since this could already solve many of such use cases, but for sure not all (e.g. with more complex combinations of parameters, and, well, the CreateChars(uppercase=True, num=0) case), and also lacks the level of convenience that your proposal has.
  2. In your example, you imply that the final branch map will always be the cartesian product of all possible workflow parameter values. I think one could want that behavior in some cases, but it should be part of a more generic mechanism, configurable to also behave differently if needed.

With point 2 in mind, I think one can derive two general use cases.

Case 1

The branch map is defined, as usual, in the create_branch_map method and setting --branch picks one of them. However, not specifying the branch but rather some WorkflowParameter's instead, will pick the branch internally for you. The way the lookup is done is an implementation detail that can be solved later. The important bit is that in the example

class CreateChars(law.LocalWorkflow):

    num = law.WorkflowParameter(...)
    uppercase = law.WorkflowParameter(...)

    def create_branch_map(self):
        return {
            0: {"num": 0, "uppercase": True},
            1: {"num": 1, "uppercase": True},
            ...
            26: {"num": 0, "uppercase": False},
            27: {"num": 1, "uppercase": False},
        }

one can access a branch or a sliced workflow via

# branches
CreateChars(branch=0)                # --> branch 0               --> num 0, uppercase True
CreateChars(num=0, uppercase=True)   # --> num 0, uppercase True  --> branch 0
CreateChars(num=0, uppercase=False)  # --> num 0, uppercase False --> branch 26

# sliced workflows
CreateChars(branches=(26, 27, 28))           # --> branches 26 - 28
CreateChars(num=(0, 1, 2), uppercase=False)  # --> branches 26 - 28

This would be easy to achieve and ensures that the branch map remains consistent, i.e., that branch N always describes the same combination of num and uppercase. This must be preserved as the branch numbers are used throughout law, in status checks, remote job handling, job submission, resubmission, workflow slicing, etc. However, when the branch map is defined once and describes everything that the task could potentially do, we'll always be on the safe side here.

Case 2

The exact branch map is unknown a priory but the combination of workflow parameters passed to the task during instantiation defines it dynamically. This can again be done via cartesian products, or generically, via implementing the create_branch_map method, but this time using the values passed to the (then dynamic) workflow parameters. This would mostly look and feel like usecase 1 for users, but comes with (at least) two implications:

If we were to assume for a second that there is in infinite amount of charaters, then the example above could become

import itertools

class CreateChars(law.LocalWorkflow):

    num = law.DynamicWorkflowParameter(cls=luigi.IntParameter)
    uppercase = law.WorkflowParameter(cls=luigi.BoolParameter, values=(True, False))

    def create_branch_map(self):
        return {
            i: {"num": num, "uppercase": uppercase}
            for i, (num, uppercase) in enumerate(itertools.product(self.num, self.uppercase))
        }

and would otherwise function the same. However, if there is at least one so-called DynamicWorkflowParameter (name ok?), then law should assume that the branch map can vary and will start to add parameter value hashes where necessary.

Looking forward to your feedback!

lmoureaux commented 1 year ago

Hi @riga, thanks a lot for your input!

Regarding your first thoughts, triggering separate workflows is of course possible. In this case however, achieving parallelism requires using --workers 2 on the host (despite using LocalWorkflow for the example, what I have in mind is remote submission). This of course breaks down if you want to probe a 40*40 grid.

Your second point is important. I had been considering it.product and zip; zip doesn't really make sense to me because, if two parameters are tied in the zip way, a single parameter can be used instead. I just realized that I've also been using it.combinations in my code, so there are certainly more use cases. So I agree that a customization point is needed. We could let the user provide a branch table in a way similar to create_branch_map, except the enumerate part would be done internally. Another possible pattern is to provide a hook to veto particular branches, as is commonly used in CI systems. I think that it.product is a good starting point and would be a good default. Maybe a good mock example here would be expanding CreateChars to support the Greek and the Cyrillic alphabet. Or just the German one with the Eszett ß included.

Now coming to cases 1 and 2, I understand that case 1 would be much easier to implement. I think case 2 is more elegant, as I'd have absolutely no problem with making the branch parameter private (if one wants to expose it, branch = WorkflowParameter(IntParameter(), range(42)) works). The main use case I see for case 2 is with RealParameter, where one could want to run a grid but also probe specific values.

I mentioned "dynamic" parameters between the lines when I wrote that WorkflowParameter should also accept values not in the default set. Now you make me realize that the default set could be entirely optional, in which case it must be specified to actually run the tasks. But then, what differentiates a WorkflowParameter from a normal parameter? Maybe the following interface makes more sense after all?

def requires(self):
    return GenericWorkflow([FooTask(foo=4), BarTask(bar=2)])

Or even...

@parallel("slurm")
def requires(self):
    return FooTask(foo=4), BarTask(bar=2)

If a task doesn't allow all parameter combinations, it can implement helper functions that returns a list of tasks (using it.product or whatever explicitly). While pretty neat, this approach makes things harder to implement and I'm not sure I want to go down that route...

Combing back to use cases, I think the implementation strategy should be to define and refine the interface with case 1, while also thinking how the backend could be extended to eventually support case 2.

riga commented 1 year ago

That all sounds good and I think that can be done rather swiftly. Btw, I just realized that I didn't write it very clear, but indeed I would like to support both use cases :)

Considering your two examples, while I like the out-of-the-box thinking, I have the feeling these patterns would not play that nicely with the current way the workflows are defined and used, but I see the point of making things more expressive. Maybe we can come up with some syntactic sugar at the end that configure the workflows in a neat manner, but based on the same internals.

Now you make me realize that the default set could be entirely optional, in which case it must be specified to actually run the tasks.

Yes, I that's what I thought about as well when I thought about the dynamic workflow parameters.

But then, what differentiates a WorkflowParameter from a normal parameter?

In the case of non-dynamic ones, they would be used internally to do branch lookups / translations in case they are set, and --branch isn't.


I am working on a quick draft on a private branch that I would share with you in the course of this week, for the both of us to push it to the limits ;)

riga commented 1 year ago

@lmoureaux I created a first draft and pushed it to the feature/workflow_parameters branch. Feel free to checkout examples/workflow_parameters.

Things like

CreateChars(num=99, upper_case=(True, False))         # workflow with 2 branches
CreateChars(num=(99, 100), upper_case=(True, False))  # workflow with 4 branches
CreateChars(num=(99, 100), upper_case=True)           # workflow with 2 branches
CreateChars(num=99, upper_case=True)                  # single branch task

work now as discussed above. Internally there is now a mapping to branch numbers happening. Since this info must be known before the actual task instantiation, note the switch of create_branch_map to be a @classmethod. This must be the case whenever a workflow has one or more law.WorkflowParameter's.

Also note that when running from the command line, --num 99 and --num 99, behave differently now. Disregarding the upper_case parameter for a sec, the former will point to a single branch task, whereas the latter refers to a workflow with one branch.

The draft does not contain dynamic parameters yet, so in the current state it's still required that the branch map is known and producible a-priory. I'm starting to think that this will be easy to add on top, but I guess it's better if we look for weak points first.

lmoureaux commented 1 year ago

Hi @riga! I'm testing your branch, and I tried running the example from the command line first. None of the variants I tried did anything useful. First trying to run everything:

$ law run CreateChars --workers 2
ValueError: when workflow parameters are set, task CreateChars requires the branch parameter to be unset (-1) but got 0; found workflow parameter(s) num,upper_case
Complete log ``` INFO: luigi-interface - Informed scheduler that task CreateChars__1__local_4bc7eb4a9d has status PENDING INFO: luigi-interface - Done scheduling tasks INFO: luigi-interface - Running Worker with 2 processes INFO: luigi-interface - [pid 64877] Worker Worker(salt=769908267, workers=2, host=max-display009.desy.de, username=mourelou, pid=63109) running CreateChars(workflow=local, branch=-1, branches=) INFO: luigi-interface - [pid 64877] Worker Worker(salt=769908267, workers=2, host=max-display009.desy.de, username=mourelou, pid=63109) new requirements CreateChars(workflow=local, branch=-1, branches=) INFO: luigi-interface - Worker Worker(salt=769908267, workers=2, host=max-display009.desy.de, username=mourelou, pid=63109) was stopped. Shutting down Keep-Alive thread ERROR: luigi-interface - Uncaught exception in luigi Traceback (most recent call last): File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/retcodes.py", line 75, in run_with_retcodes worker = luigi.interface._run(argv).worker File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/interface.py", line 211, in _run return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory) File "/home/mourelou/law/law/patches.py", line 88, in _schedule_and_run return _schedule_and_run_orig(*args, **kwargs) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/interface.py", line 173, in _schedule_and_run success &= worker.run() File "/home/mourelou/law/law/patches.py", line 85, in run return run_orig(self) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/worker.py", line 1210, in run self._handle_next_task() File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/worker.py", line 1087, in _handle_next_task new_req = [load_task(module, name, params) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/worker.py", line 1087, in new_req = [load_task(module, name, params) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/task_register.py", line 253, in load_task return task_cls.from_str_params(params_str) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/task.py", line 483, in from_str_params return cls(**kwargs) File "/home/mourelou/law/law/task/base.py", line 337, in __call__ inst = super(Register, cls).__call__(*args, **kwargs) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/task_register.py", line 88, in __call__ param_values = cls.get_param_values(params, args, kwargs) File "/home/mourelou/law/law/task/base.py", line 152, in get_param_values for name, value in cls.modify_param_values(OrderedDict(values)).items() File "/home/mourelou/law/law/workflow/base.py", line 500, in modify_param_values raise ValueError( ValueError: when workflow parameters are set, task CreateChars requires the branch parameter to be unset (-1) but got 0; found workflow parameter(s) num,upper_case ```

Trying to run a single task: (edit: initial version didn't account for the ASCII offset)

$ law run CreateChars --workers 2 --upper-case False --num 100
TypeError: 'dict' object cannot be interpreted as an integer
Complete log ``` INFO: luigi-interface - Informed scheduler that task CreateChars_7__local_b27ac78a77 has status PENDING INFO: luigi-interface - Done scheduling tasks INFO: luigi-interface - Running Worker with 2 processes INFO: luigi-interface - [pid 39951] Worker Worker(salt=734881065, workers=2, host=max-display009.desy.de, username=mourelou, pid=37870) running CreateChars(workflow=local, branch=7) ERROR: luigi-interface - [pid 39951] Worker Worker(salt=734881065, workers=2, host=max-display009.desy.de, username=mourelou, pid=37870) failed CreateChars(workflow=local, branch=7) Traceback (most recent call last): File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/worker.py", line 199, in run new_deps = self._run_get_new_deps() File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/worker.py", line 141, in _run_get_new_deps task_gen = self.task.run() File "/home/mourelou/law/examples/workflow_parameters/tasks.py", line 45, in run char = chr(num) TypeError: 'dict' object cannot be interpreted as an integer INFO: luigi-interface - Informed scheduler that task CreateChars_7__local_b27ac78a77 has status FAILED INFO: luigi-interface - Worker Worker(salt=734881065, workers=2, host=max-display009.desy.de, username=mourelou, pid=37870) was stopped. Shutting down Keep-Alive thread INFO: luigi-interface - ===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 failed: - 1 CreateChars(...) This progress looks :( because there were failed tasks ===== Luigi Execution Summary ===== ```

Trying to run several branches: (edit: initial version didn't account for the ASCII offset)

$ law run CreateChars --workers 2 --upper-case False --num 100,101
ValueError: when workflow parameters are set, task CreateChars requires the branch parameter to be unset (-1) but got 7; found workflow parameter(s) num,upper_case
Complete log ``` INFO: luigi-interface - Informed scheduler that task CreateChars__1_7_9_local_87fe7d3a80 has status PENDING INFO: luigi-interface - Done scheduling tasks INFO: luigi-interface - Running Worker with 2 processes INFO: luigi-interface - [pid 60162] Worker Worker(salt=504669402, workers=2, host=max-display009.desy.de, username=mourelou, pid=59272) running CreateChars(workflow=local, branch=-1, branches=7,9) INFO: luigi-interface - [pid 60162] Worker Worker(salt=504669402, workers=2, host=max-display009.desy.de, username=mourelou, pid=59272) new requirements CreateChars(workflow=local, branch=-1, branches=7,9) INFO: luigi-interface - Worker Worker(salt=504669402, workers=2, host=max-display009.desy.de, username=mourelou, pid=59272) was stopped. Shutting down Keep-Alive thread ERROR: luigi-interface - Uncaught exception in luigi Traceback (most recent call last): File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/retcodes.py", line 75, in run_with_retcodes worker = luigi.interface._run(argv).worker File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/interface.py", line 211, in _run return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory) File "/home/mourelou/law/law/patches.py", line 88, in _schedule_and_run return _schedule_and_run_orig(*args, **kwargs) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/interface.py", line 173, in _schedule_and_run success &= worker.run() File "/home/mourelou/law/law/patches.py", line 85, in run return run_orig(self) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/worker.py", line 1210, in run self._handle_next_task() File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/worker.py", line 1087, in _handle_next_task new_req = [load_task(module, name, params) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/worker.py", line 1087, in new_req = [load_task(module, name, params) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/task_register.py", line 253, in load_task return task_cls.from_str_params(params_str) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/task.py", line 483, in from_str_params return cls(**kwargs) File "/home/mourelou/law/law/task/base.py", line 337, in __call__ inst = super(Register, cls).__call__(*args, **kwargs) File "/home/mourelou/law/examples/workflow_parameters/tmp/luigi/luigi/task_register.py", line 88, in __call__ param_values = cls.get_param_values(params, args, kwargs) File "/home/mourelou/law/law/task/base.py", line 152, in get_param_values for name, value in cls.modify_param_values(OrderedDict(values)).items() File "/home/mourelou/law/law/workflow/base.py", line 500, in modify_param_values raise ValueError( ValueError: when workflow parameters are set, task CreateChars requires the branch parameter to be unset (-1) but got 7; found workflow parameter(s) num,upper_case ```

CreateAlphabet gives the same error as CreateChars without arguments, so this isn't a command line parsing error. I'm looking at your code changes next, maybe I figure out the issue.

lmoureaux commented 1 year ago

I'm not a huge fan of the proposed API, it's diverging too much from regular workflows... If create_branch_map needs to be static, it should at least have another name.

I implemented a proof of concept workflow class that works basically the same as your code (get the branch map, filter it for requested branches, set the branches attribute), but doesn't need changes to the Law core. It doesn't require create_branch_map to be static but adds some restrictions in case __init__ is non-trivial in a derived class. If it turns out that getting the branch map in the constructor doesn't work, we can always add a hook that Law would call before doing any branch-related work on a task (ala dataclass.__post_init__). I'm also not sure how this interacts with Luigi's promise that identical tasks are only ever instantiated once. But workflows aren't real tasks anyway, they're just proxies to run other tasks, so I guess that's not a huge concern anyway.

Another significant difference is that I delegate more work to my WorkflowParameter class: it is in charge of checking whether the user has set a value and whether a value (from the branch map) was requested by the user. I think this approach will be more flexible in the long run.

It seems we covered basically the same set of edge cases, except that I don't sanitize the branch map (this should really be done in all cases, also without workflow parameters, so I'd consider this an orthogonal feature) and you have this case that I don't fully understand. I also don't do any caching of the branch map since this is just a PoC. If caching is required, this should be abstracted away in a helper function so everyone could benefit from it :)

The value taken by the workflow attributes in branch tasks is a concern. I set them to the values from the branch, which I think results in the nicest API:

    def run(self):
        num = self.num + (26 if self.upper_case else 0)

If for some reason we don't want to do that, there are two options: either workflow parameters keep their "list" value in branch tasks, or we delattr() them altogether.