pharmbio / sciluigi

A light-weight wrapper library around Spotify's Luigi workflow library to make writing scientific workflows more fluent, flexible and modular
http://dx.doi.org/10.1186/s13321-016-0179-6
MIT License
334 stars 57 forks source link

Automatic output path generation #19

Open hbredin opened 8 years ago

hbredin commented 8 years ago

I had difficulty finding a good naming convention for all my out_xxxx path, when my workflow would become complicated (e.g. with one task taking three other tasks as input: how should I name its output?)

Therefore, I have created a sciluigi.Task mixin called AutoOutput that would automatically add an out_put method to a task (see below). Maybe it can be useful for others...

All you have to do to use it is the following:

class MyTask(sciluigi.Task, AutoOutput):
    in_put1 = None
    in_put2 = None
    paramA = luigi.Parameter()
    paramB = luigi.Parameter()

    def run(self):
        with self.out_put().open('w') as fp:
            pass

class MyWorkflow(sciluigi.WorkflowTask):
    workdir = luigi.Parameter()

    task1 = self.new_task('task1', Task1)
    task2 = self.new_task('task2, Task2)

    task = self.new_task('task', MyTask)
    task.in_put1 = task1.out_put()
    task.in_put2 = task2.out_put()

It does have a few limitations, the main one being that it does not support tasks with structured inputs.

This will work:

task.in_put1 = task1.out_put
task.in_put2 = task2.out_put

This will not work:

task.in_put = {
  'input1': task1.out_put
  'input2': task2.out_put
}

Here is the code of the AutoOutput mixin:

class AutoOutput(object):

    def _output_from_hash(self):

        # working directory within which all automatic outputs will be stored
        workdir = self.workflow_task.workdir

        description = {}

        # add one {key: value} per in_xxxx method
        # key = 'in_xxxx'
        # value = F(in_xxxx().path)
        for attrname, attrval in six.iteritems(self.__dict__):
            if 'in_' == attrname[0:3]:
                path = attrval().path
                if path.startswith(workdir):
                    path = path[len(workdir):]
                description[attrname] = path

        # add one {key: value} per task parameter
        # key = parameter name
        # value = parameter value
        params = self.get_params()
        params = [name for name, _ in params]
        for param_name in params:
            # do not take 'instance_name' and 'workflow_task' into account
            if param_name in ['instance_name', 'workflow_task']:
                continue
            description[param_name] = getattr(self, param_name)

        # hash the resulting dictionary
        digest = hashlib.sha1(
            json.dumps(description, sort_keys=True)).hexdigest()

        # generate out_put path automatically
        output_path = '{workdir}/{workflow_name}/{instance_name}/{digest}'
        return output_path.format(
            workdir=workdir,
            instance_name=self.instance_name,
            workflow_name=self.workflow_task.__class__.__name__,
            digest=digest)

    def out_put(self):
        # automagically get out_put path
        path = self._output_from_hash()
        return sciluigi.TargetInfo(self, path)
hbredin commented 8 years ago

The question that ensues is somehow related to #18 (rather a generalization of it).
Since paths are now generated automatically, it is not that obvious to know where output files were written (this is true even without the AutoOutput mixin)

Is there an easy way to access the output of all tasks constituting the workflow. I tried the following

def getAllOutputs(workflow):
    outputs = {}
    for instance_name, task in six.iteritems(workflow._tasks):
        outputs[instance_name] = task.out_put().path
    return outputs

workflow = MyWorkflow()
outputs = getAllOutputs(workflow)

But it looks like, at this point, tasks constituting the workflow (workflow._tasks) are not instantiated yet, and all we got are output paths based on default parameter values.

What does workflow._tasks contains exactly?

samuell commented 8 years ago

(Hi, sorry, have been a bit busy, will look at this now!)

samuell commented 8 years ago

I had difficulty finding a good naming convention for all my out_xxxx path, when my workflow would become complicated (e.g. with one task taking three other tasks as input: how should I name its output?)

This is hard to say without a concrete example. We have had cases where we often have multiple outputs, so it has been central for us to give each output a unique name and thus "identity". In cases where we had a single output, we have kept with the same pattern and tried to give a descriptive name, such as .out_concatenated, or .out_traindata, or .out_testdata.

But it looks like, at this point, tasks constituting the workflow (workflow._tasks) are not instantiated yet, and all we got are output paths based on default parameter values.

Yea, without knowing this for sure in this case without testing, I often found problems with the fact that Luigi separates scheduling and workflow execution in two phases, and so tasks are not fully instantiated until the scheduling phase is finished and the execution started.

Our biggest problem with this is that makes it hard for example to initiate a new task with parameter values calculated by a previous task, since parameter values need to be provided at scheduling time, and scheduling time is over after the execution starts. As a side note, this is one reason why we are experimenting with a fully dataflow-based approach in scipipe, where scheduling and execution can happen interchangeably (but it's not production ready yet).

Is there an easy way to access the output of all tasks constituting the workflow. I tried the following

Will have to test a little before getting back on this, and the other remaining questions. Will get back to you shortly!

hbredin commented 8 years ago

FYI, I ended up saving every automagically generated output paths in an attribute of the parent workflow: https://github.com/pyannote/pyannote-workflows/blob/master/pyannote_workflows/utils.py#L56-L68