riga / law

Build large-scale task workflows: luigi + job submission + remote targets + environment sandboxing using Docker/Singularity
http://law.readthedocs.io
BSD 3-Clause "New" or "Revised" License
96 stars 39 forks source link

Recursion and branch map caching Problem with nested Workflows #138

Closed harrypuuter closed 1 year ago

harrypuuter commented 1 year ago

Hi,

I am currently facing a problem setting up a more complex workflow task, where I observe very bad scaling. I have created a minimal example that represents the actual workflow I am trying to implement. I am using the current master branch of law.

I have a WrapperTask, which requires WorkflowZero, which requires WorkflowOne, which requires WorkflowTwo.

Workflow Two is used to process a number of files (e.g. 100, 500, 1000). Workflow One is used to process the output of workflow Two (one task per file) Workflow Zero is used to process the output of workflow One (one task per file)

In all workflows, the NestedSiblingFileCollection is used. In order to create the branch map for workflow one and zero, the input from the processing workflow is used. The example can be found below.

The problem with this setup is that it gets unusable when using a larger number of files. The print-status command is scaling exponentially with the number of files in workflow two:

100 files : ~0.4 s 500 files: ~15 s 1000 files: ~95s

Example:


import copy
import luigi
import law
from law.task.base import WrapperTask
import time

class Wrapper(WrapperTask):
    """
    collective task to trigger friend production for a list of samples,
    if the samples are not already present, trigger ntuple production first
    """

    files_per_task = luigi.IntParameter(default=10)

    def requires(self):
        requirements = {}
        for nick in ["Nick1", "Nick2", "Nick3"]:
            requirements[f"WorkflowZero_{nick}"] = WorkflowZero(
                nick=nick,
                name_zero="WorkflowZero",
                name_one="WorkflowOne",
                name_two="WorkflowTwo",
            )

        return requirements

    def run(self):
        pass

class WorkflowZero(law.LocalWorkflow):
    output_collection_cls = law.NestedSiblingFileCollection

    nick = luigi.Parameter()
    files_per_task = luigi.IntParameter()
    name_zero = luigi.Parameter()
    name_one = luigi.Parameter()
    name_two = luigi.Parameter()

    def workflow_requires(self):
        requirements = {}
        requirements["WorkflowOne"] = WorkflowOne.req(self)
        return requirements

    def create_branch_map(self):
        print("[WorkflowZero] Creating branch map")
        start = time.time()
        branch_map = {}
        inputs = copy.deepcopy(self.input()["WorkflowOne"]["collection"])
        # get all files from the dataset, including missing ones
        branches = [item for subset in inputs.iter_existing() for item in subset]
        branches += [item for subset in inputs.iter_missing() for item in subset]
        for counter,inputfile in enumerate(branches):
            branch_map[counter] = {
                "nick": self.nick,
                "inputfile": inputfile.path,
            }
        print("[WorkflowZero] Time to create branch map: {}".format(time.time() - start))
        return branch_map

    def output(self):
        print("[WorkflowZero] Creating output targets")
        start = time.time()
        nick = "{name_zero}/{nick}_{branch}.root".format(
                name_zero=self.name_zero,
                nick=self.branch_data["nick"],
                branch=self.branch,
        )
        print("[WorkflowZero] Time to create output targets: {}".format(time.time() - start))
        return law.LocalFileTarget(nick)

    def run(self):
        print("Finished WorkflowOne")

class WorkflowOne(law.LocalWorkflow):
    output_collection_cls = law.NestedSiblingFileCollection

    nick = luigi.Parameter()
    files_per_task = luigi.IntParameter()
    name_one = luigi.Parameter()
    name_two = luigi.Parameter()

    def workflow_requires(self):
        requirements = {}
        requirements["WorkflowTwo"] = WorkflowTwo.req(self)
        return requirements

    def create_branch_map(self):
        # print("    [WorkflowOne] Creating branch map")
        start = time.time()
        branch_map = {}
        inputs = copy.deepcopy(self.input()["WorkflowTwo"]["collection"])
        # get all files from the dataset, including missing ones
        branches = [item for subset in inputs.iter_existing() for item in subset]
        branches += [item for subset in inputs.iter_missing() for item in subset]
        # print(branches)
        for counter,inputfile in enumerate(branches):
            branch_map[counter] = {
                "nick": self.nick,
                "inputfile": inputfile.path,
            }
        # print("    [WorkflowOne] Time to create branch map: {}".format(time.time() - start))
        return branch_map

    def output(self):
        # print("    [WorkflowOne] Creating output targets")
        start = time.time()
        nick = "{name_one}/{nick}_{branch}.root".format(
                name_one=self.name_one,
                nick=self.branch_data["nick"],
                branch=self.branch,
        )
        # print("    [WorkflowOne] Time to create output targets: {}".format(time.time() - start))
        return law.LocalFileTarget(nick)

    def run(self):
        print("Finished WorkflowOne")

class WorkflowTwo(law.LocalWorkflow):
    output_collection_cls = law.NestedSiblingFileCollection

    nick = luigi.Parameter()
    files_per_task = luigi.IntParameter()
    name_two = luigi.Parameter()

    def create_branch_map(self):
        # print("        [WorkflowTwo] Creating branch map")
        start = time.time()
        branch_map = {}
        branches = {}
        inputdata = [f"path/to/{self.nick}/file_{x}.root" for x in range(0, 100)]
        # here, we use 10 files per task
        for filecounter, filename in enumerate(inputdata):
            if (int(filecounter / self.files_per_task)) not in branches:
                branches[int(filecounter / self.files_per_task)] = []
            branches[int(filecounter / self.files_per_task)].append(filename)
        for i, branch in enumerate(branches):
            branch_map[i] = {}
            branch_map[i]["nick"] = self.nick
            branch_map[i]["files"] = branches[branch]

        # print("        [WorkflowTwo] Time to create branch map: {}".format(time.time() - start))
        return branch_map

    def output(self):
        # print("        [WorkflowTwo] Creating output targets")
        start = time.time()
        nick = "{name}/{nick}_{branch}.root".format(
                name=self.name_two,
                nick=self.branch_data["nick"],
                branch=self.branch)
        # print("        [WorkflowTwo] Time to create output targets: {}".format(time.time() - start))
        return law.LocalFileTarget(nick)

    def run(self):
        print("Finished WorkflowTwo")

From what I found in the law implementation, this problem seems to be related to the fact that the branch_map is not cached properly, and in this nested setup, many calls of create_branch_map are made, slowing down the whole determination. In the example, I use local file targets; when using wlcg targets, the problem gets even worse since the existence checks are much more expensive.

I tried to identify the underlying issue with the branch_map caching and it seems like this https://github.com/riga/law/blob/fd750b1df3cd41df1d0d08d945ef8182ac39d7b1/law/workflow/base.py#L623 caching does not work after changing a task to as_workflow() and as as result, the branch_map has to be recreated by calling the create_branch_map function of each workflow. Since we have three nested workflows here, where the inputs are used to create each branch map, the problem is further amplified.

Is there another way, how such a workflow should be setup ? Any input is appreciated !

riga commented 1 year ago

Hey @harrypuuter ,

thanks for opening this issue! The caching was added at some point exactly for cases like that, but it is obviously not doing what it should right now.

I think this can be fixed, but my time is a bit limited the next couple days. I'm trying to fix this asap and get back to you for a final confirmation of the fix :)

harrypuuter commented 1 year ago

Hi @riga - Thanks a lot for looking into this. If I can be of any help, testing stuff etc. let me know !

riga commented 1 year ago

Finally found the time ...

And indeed it was a caching "feature". So far, the cache was not forwarded from the workflow to its branch tasks (and also not among branch tasks themselves) so that each of them tried to build up its own branch map, not knowing that the associated workflow already did that.

I added this forwarding and it's way faster now as expected. Could you also try with the feature/forward_cached_workflow branch?

riga commented 1 year ago

On a different note (and sorry if this was just for the sake of creating an example), is there a reason why you

riga commented 1 year ago

Closing and merging for now, but feel free to still report if this fixes your issue.

harrypuuter commented 1 year ago

Hey, sorry for the late reply. With the fix, everything now works as intended, thank you very much !