riga / law

Build large-scale task workflows: luigi + job submission + remote targets + environment sandboxing using Docker/Singularity
http://law.readthedocs.io
BSD 3-Clause "New" or "Revised" License
96 stars 39 forks source link

Getting information from `input()` during `create_branch_map()` #173

Closed StFroese closed 7 months ago

StFroese commented 7 months ago

Question

Hi, let's say a a have Task A which produces a file contains observation ids and information of each observation a fits file.

Task B now requires that Task A is run before. For each observation in the input file I want to create dataset based on the information stored in the fits file. I have difficulties creating the branch map over the observations since I cannot access the information since Task A is run after the creation of the branch map. Is there a way to solve this problem or should required tasks always run before the branch map creation?

class DL3CreateDL4Datasets(ObservationCampaignTask, BinningTask, LocalWorkflow):

    def workflow_requires(self):
        return {"dl3": DL3LinkBkg.req(self)} # Task creating fits file with observation ids and measurements for each observation

    def requires(self):
        return {"dl3": DL3LinkBkg.req(self)}  # Task creating fits file with observation ids and measurements for each observation

    def create_branch_map(self): # This will fail because the required Task hasn't run yet when this function is called
        ds = DataStore.from_dir(Path(self.input()["dl3"].path).parent) # open the data file
        obs_ids = ds.obs_ids # Getting observation ids
        return {idx: (obs_id, ds.obs(obs_id)) for idx, obs_id in enumerate(obs_ids)} # iterate over obs_ids and get observation by obs_id

    def build_path(self):
        pass

    def output(self):
       pass

    def run(self):
        pass
riga commented 7 months ago

Hi @StFroese ,

that sounds like a job for dynamic_workflow_condition 🎉.

This is from the doc string (I'm currently refactoring the docs, and it's not on the public docs page yet):

"""
dynamic_workflow_condition()

Container for a workflow method that defines whether the branch map can be dynamically
constructed or whether a placeholder should be used until the condition is met. Similar to
Python's ``property``, instances of this class provide additional attributes for decorating
other methods that usually depend on the branch map, such as branch requirements or outputs.

It is recommended to use the :py:func:`dynamic_workflow_condition` decorator (factory).
Example:
"""

class MyWorkflow(law.LocalWorkflow):

    def workflow_requires(self):
        # define requirements for the full workflow to start
        reqs = super().workflow_requires()
        reqs["files"] = OtherTask.req(self)
        return reqs

    @law.dynamic_workflow_condition
    def workflow_condition(self):
        # declare that the branch map can be built if the workflow requirement exists
        # note: self.input() refers to the outputs of tasks defined in workflow_requires()
        return self.input()["files"].exists()

    @workflow_condition.create_branch_map
    def create_branch_map(self):
        # let's assume that OtherTask produces a json file containing a list of objects
        # that _this_ workflows iterates over, so we can simply return this list here
        return self.input()["files"].load(formatter="json")

    def requires(self):
        # branch-level requirement
        # note: this is not really necessary, since the branch requirements are only
        # evaluated _after_ a branch map is built, so OtherTask must have been completed
        return OtherTask.req(self)

    @workflow_condition.output
    def output(self):
        # define the output
        return law.LocalFileTarget("file_{}.txt".format(self.branch))

    def run(self):
        # trivial run implementation
        self.output().touch()

So one just has to define the condition that defines when the task is "allowed" to define its branch map. If the condition is not met yet, law will not schedule the task itself, but first process all requirements, and then re-built the branch map once it can do that. Needless to say, the requirements should somehow lead to the fulfillment of the condition, but this should naturally be the case if the condition is the "existence of outputs of required tasks" (== inputs).

Note that both create_branch_map() and output() should be decorated to signalize to law that these two methods depend on the condition as well and are to be re-evaluated.

Does this help?

StFroese commented 7 months ago

Hi @riga, thanks a lot, works like a charm! 🎉