spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.84k stars 2.39k forks source link

Cleanup Targets / Rerun tasks #595

Closed jonmorehouse closed 6 years ago

jonmorehouse commented 9 years ago

Whats the best way to cleanup targets so a task can be rerun with a fresh state?

I'm looking into subclassing the target classes I'm using and adding a "cleanup" method, is this something that aligns with the Luigi project, or is there another solution?

Tarrasch commented 9 years ago

This is interesting, even more so if one could integrate this with the CLI. One might also want this to cascade down to ones dependencies, so you can regenerate all dependent data as well, but that's a bit scare and one should be careful of course. :)

jonmorehouse commented 9 years ago

Yes, thats the eventual goal. I think a good starting point would be adding "cleanup" methods to the various targets.

themalkolm commented 9 years ago

Just curious. Do you mean a method to delete a target?

Tarrasch commented 9 years ago

Yea. I guess the that complete() must return False after you've run a cleanup() method on that task..

miku commented 9 years ago

For a project, we had a similar requirement, that is to be able to quickly rerun tasks. We ended up writing some wrapper shell/python scripts; among them one that runs a task, one that will return the output of a task, and one that would just rm and then run again. It would be great to have some "redo" functionality support in luigi, though.

themalkolm commented 9 years ago

Hm.. intersting. Would be nice to have a notion of a task that can not repeat it’s output. So with this ‘rerun’ concept we can run the whole pipeline again from the beginning redoing everything except those tasks.

I know that luigi has ExternalTask but from my understanding it is something different. It relies on something else to produce a resource.

On 18 Jan 2015, at 23:47, Martin Czygan notifications@github.com wrote:

For a project, we had a similar requirement, that is to be able to quickly rerun tasks. We ended up writing some wrapper shell/python scripts https://github.com/miku/siskin/tree/ae34a5f7580308726e8650b4a4978b4d8b64fe85/bin; among them one that runs a task https://github.com/miku/siskin/blob/ae34a5f7580308726e8650b4a4978b4d8b64fe85/bin/taskdo, one that will return the output https://github.com/miku/siskin/blob/ae34a5f7580308726e8650b4a4978b4d8b64fe85/bin/taskoutput of a task, and one that would just rm https://github.com/miku/siskin/blob/ae34a5f7580308726e8650b4a4978b4d8b64fe85/bin/taskrm and then run again https://github.com/miku/siskin/blob/ae34a5f7580308726e8650b4a4978b4d8b64fe85/bin/taskredo. It would be great to have some "redo" functionality support in luigi, though.

— Reply to this email directly or view it on GitHub https://github.com/spotify/luigi/issues/595#issuecomment-70430326.

younes-abouelnagah commented 9 years ago

As a new user to Luigi, I found it nice that the Luigi execution logic is dead-simple; tasks are idempotent so if the target is there it won't run. This is consistent with its philosophy that triggering the tasks is not part of the features. I'd say the more features you add to Luigi the harder it will be to learn it -- it already required a very steep learning curve because the docs and examples are very simplistic.

Tarrasch commented 9 years ago

Thanks for your feedback @younes-abouelnagah. I do agree with you that we should keep these kind of features off from the core of luigi logic.

By the way, it would be awesome if you would add any docs that you find are missing @younes-abouelnagah :)

alexshires commented 9 years ago

I would like a recreate call too, you could subclass a Task to take a recreate Parameter (default False) which overrides the complete() method....

kwilcox commented 8 years ago

For anyone finding there way here looking for an example, this might solve your needs (adjust for Py2 if needed):

class ForceableTask(luigi.Task):

    force = luigi.BoolParameter(significant=False, default=False)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # To force execution, we just remove all outputs before `complete()` is called
        if self.force is True:
            outputs = luigi.task.flatten(self.output())
            for out in outputs:
                if out.exists():
                    os.remove(self.output().path)
EntilZha commented 8 years ago

I would find this feature useful for iterating on Luigi pipelines. For example, suppose I have a pipeline like:

It would be nice to be able to specify a way to remove the targets for anything downstream of a specific task or force execution of any downstream tasks of a given one (in this case, anything that the model outputs since I could be trying different models/parameters/code/etc out).

Would this be accomplished by inheriting the ForceableTask above instead of luigi.Task?

jkryanchou commented 8 years ago

+1 I have met the same issue with the requirement on re-runing same task for multiple times. Would the developers plan to provide a flag named dont_filter luigi.Task as class object init()?

mattayes commented 8 years ago

👍 Even just a simple command-line switch -f/--force would be very helpful (obviously not deleting ExternalTasks).

ifiddes commented 7 years ago

The solution by @kwilcox works, but I am wondering how I could take it a bit further. I want to incorporate that method in to a abstract base task, but then run into the problem that if I set --force and run a task it will obviously execute the remove() method for every task upstream.

How can I check to see that I am actually in the __init__ of the top level task call, and not one of its dependencies? Looking at the class variables a Target object has, I am not sure that I can know this. Is the original command line call accessible in any way?

drmikecrowe commented 7 years ago

Simplistic, but I needed a task that would always run. I created a class called RecurringTask, and manually called self.finish() (at the end of the run() or rows() method) to flag as complete:

class RecurringTask(DefaultTask):
    _complete = False

    def complete(self):
        return self._complete

    def finished(self):
        self._complete = True

Any downside of this?

@kwilcox solution is good, but some of my targets are not files (database tables, for example).

nirg commented 7 years ago

Not directly a solution for forcing execution, but a lot of the times I needed to rerun tasks is because a previous failure resulted in partial output. So, I wrote a MixIn class that removes the target upon failure:

class RemoveOnFailureMixin(object):

    def on_failure(self, exception):
        targets = luigi.task.flatten(self.output())
        for target in targets:
            if target.exists() and isinstance(target, luigi.LocalTarget):
                target.remove()
        return luigi.Task.on_failure(self, exception)

Then mix it in like that:

class TaskA(RemoveOnFailureMixin, luigi.Task):

Hope that helps!

davidbrochart commented 7 years ago

Hi @ifiddes I made some changes to @kwilcox 's solution, and added a force-upstream parameter, which allows to recursively remove the output files of all the dependencies as well.

class ForceableTask(luigi.Task):

    force          = luigi.BoolParameter(significant=False, default=False)
    force_upstream = luigi.BoolParameter(significant=False, default=False)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if self.force_upstream is True:
            self.force = True
        if self.force is True:
            done = False
            tasks = [self]
            while not done:
                outputs = luigi.task.flatten(tasks[0].output())
                [os.remove(out.path) for out in outputs if out.exists()]
                if self.force_upstream is True:
                    tasks += luigi.task.flatten(tasks[0].requires())
                tasks.pop(0)
                if len(tasks) == 0:
                    done = True

The other way would be useful too: removing the output files of all the tasks that depend on the current task (force downstream). Unfortunately I don't think that a task knows which tasks directly depend on it. It would be easy for a child task to register itself into a parent task when requires() is called. Otherwise we need to walk through all the tasks and it requires quite a lot of processing.

quantoid commented 6 years ago

Could you just override exists() on your output Target to always return False so it always runs the task?

In my case I have a task that generates data for a date. If the date is in the past it can use previously generated data, but if it's today it needs to regenerate the data because it's likely to be incomplete.

So I need a way to invalidate a target based on the time when it's checked. I guess I could just override exists() to always delete the file and return False if it's for today.

diogoffmelo commented 6 years ago

@davidbrochart If you know the last task (or tasks), you should be able to build the dag and inverse dag of dependencies and iterate over it. Topological sort is an efficient algorithm for the task. I've made a gist on this: link.

cabhi commented 6 years ago

i just took idea from Range Base task and from @davidbrochart , and created a task which takes an TaskParameter and cleans upward. below is code snippet. let me know your inputs for the same.

class CleanUpTask(luigi.Task):
    pdate = luigi.DateSecondParameter()
    clean_task = luigi.TaskParameter()

    def run(self):
        total_outputs = []
        done = False
        tasks = [self.clean_task(pdate=self.pdate)]

        while not done:

            total_outputs += luigi.task.flatten(tasks[0].output())
            tasks += [t for t in luigi.task.flatten(tasks[0].requires()) if t not in tasks if
                      not isinstance(t, ExternalTask) if not isinstance(t, type(tasks[0])) if
                      not getattr(tasks[0], 'run') is None]
            tasks.pop(0)

            if len(tasks) == 0:
                done = True

        // Can do clean up here. as of now just printing path
        for out in total_outputs:
            if out.exists() and hasattr(out, 'path'):
                print(out.path)

    def output(self):
        return TargetNeverExists()
grauscher commented 6 years ago

@kwilcox: I really like your ForceableTask solution! I use it in our luigi projects where I work.

The only improvement I could suggest, is to replace os.remove(out.path), by a out.remove(). Should the target have this method, it will clean the completion "flag" adequately, is it a local file, or an entry in a DB. Otherwise, it can raise a NotImplementedError, or skip this part with a warning. @drmikecrowe points the problem when the target in question is not a LocalTarget. @davidbrochart solves the problem of using out instead of self.output(). @cabhi verifies if the output has a path property.

Another commentary is that you propose a loop over all the "outputs" of the given task, but actually only call os.remove() with self.output().path as argument, insteaf of out.path.

In summary:

outputs = luigi.task.flatten(self.output())
    for out in outputs:
        if out.exists():
            # Old line
            #os.remove(self.output().path)
            # What I propose
            try:
                out.remove()
            except AttributeError:
                raise NotImplementedError
stale[bot] commented 6 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.

d6tdev commented 5 years ago

d6tflow which is built on top of luigi implements this. It has the functionality discussed: cleanup() is invalidate(). It cascades upstream and downstream. It asks you confirm invalidation unless you override. See details at https://d6tflow.readthedocs.io/en/latest/control.html#manually-forcing-task-reset-and-rerun. Caveat: it only works for d6tflow tasks and targets, not for all luigi targets, but should take you a long way.

meenalp21 commented 4 years ago

We faced this issue in our project too. Our luigi targets are not files but Objects. We declared these targets as the class level variables (static) which were accessible outside the classes using the class name. We wrote a wrapper task which sets all the target values to null after the whole pipeline execution is done. This way a new pipeline can be triggered with a fresh status. A workaround but it did the job..

mjtung commented 4 years ago

@kwilcox: I really like your ForceableTask solution! I use it in our luigi projects where I work.

The only improvement I could suggest, is to replace os.remove(out.path), by a out.remove(). Should the target have this method, it will clean the completion "flag" adequately, is it a local file, or an entry in a DB. Otherwise, it can raise a NotImplementedError, or skip this part with a warning. @drmikecrowe points the problem when the target in question is not a LocalTarget. @davidbrochart solves the problem of using out instead of self.output(). @cabhi verifies if the output has a path property.

Another commentary is that you propose a loop over all the "outputs" of the given task, but actually only call os.remove() with self.output().path as argument, insteaf of out.path.

In summary:

outputs = luigi.task.flatten(self.output())
    for out in outputs:
        if out.exists():
            # Old line
            #os.remove(self.output().path)
            # What I propose
            try:
                out.remove()
            except AttributeError:
                raise NotImplementedError

Thanks, taking into account all the insights and code here, I found it useful for myself to add a check when removing outputs for upstream dependencies, to only remove the outputs for ForceableTasks. That way, if Task types are not ForceableTasks, they would never have their outputs removed (ie. they would never re-run). However, the force_upstream flag does not propagate upwards past non-ForeceableTasks:

while tasks:  #while tasks is not empty, pop tasks from list
    currentTask = tasks.pop(0) 
    if isinstance(currentTask, ForceableTask): #only remove outputs for ForceableTasks
        outputs = luigi.task.flatten(currentTask.output())
        [out.remove() for out in outputs if out.exists()]

    if self.force_upstream:
        tasks += luigi.task.flatten(currentTask.requires())  
jaklinger commented 3 years ago

For those who are interested in a method that works even for targets without a remove method (e.g. contrib.mysqldb users such as myself), consider the following setup which, rather than deleting any outputs, simply toggles exists method to False with some monkey patching:

def toggle_force_to_false(func):
    """Toggle self.force permanently to be False. This is required towards                      
    the end of the task's lifecycle, when we need to know the true value                        
    of Target.exists()"""
    def wrapper(self, *args, **kwargs):
        self.force = False
        return func(self, *args, **kwargs)
    return wrapper

def toggle_exists(output_func):
    """Patch Target.exists() if self.force is True"""
    def wrapper(self):
        outputs = output_func(self)
        for out in luigi.task.flatten(outputs):
            # Patch Target.exists() to return False                                             
            if self.force:
                out.exists = lambda *args, **kwargs: False
            # "Unpatch" Target.exists() to it's original form                                   
            else:
                out.exists = lambda *args, **kwargs: out.__class__.exists(out, *args, **kwargs)
        return outputs
    return wrapper

class ForceableTask(luigi.Task):
    """A luigi task which can be forceably rerun"""
    force = luigi.BoolParameter(significant=False, default=False)
    force_upstream = luigi.BoolParameter(significant=False, default=False)

    def __init__(self,  *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Force children to be rerun                                                            
        if self.force_upstream:
            self.force = True
            children = luigi.task.flatten(self.requires())
            for child in children:
                child.force = True

    def __init_subclass__(cls):
        super().__init_subclass__()
        cls.output = toggle_exists(cls.output)
        # Later on in the task's lifecycle, 'run' and 'trigger_event' are called so we can use      
        # these as an opportunity to toggle "force = False" to allow the Target.exists()        
        # to return it's true value at the end of the Task                                      
        cls.run = toggle_force_to_false(cls.run)
        cls.trigger_event = toggle_force_to_false(cls.trigger_event)
ireneusz-ptak commented 3 years ago

@jaklinger

def __init__(self,  *args, **kwargs):
    super().__init__(*args, **kwargs)

    def propagate_force_flag(task):
        # Force children to be rerun                                                            
        if task.force_upstream:
        task.force = True
        children = luigi.task.flatten(task.requires())
        for child in children:
            child.force = True
            propagate_force_flag(child)

    propagate_force_flag(self)

to be precise ;)