spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.8k stars 2.39k forks source link

Running a group of ExternalProgramTasks #1961

Closed dletendre closed 7 years ago

dletendre commented 7 years ago

I'm trying to run a group of external tasks as part of the workflow (they are actually incremental sqoop jobs, but using ps as a example), and I keep getting error "RuntimeError: Unfulfilled dependencies at run time".

import luigi
from luigi.contrib.external_program import ExternalProgramTask
from luigi.task import flatten

class PS1(ExternalProgramTask):
    def complete(self):
        return False
    def program_args(self):
        return ['ps', 'aux']

class PS2(ExternalProgramTask):
    def complete(self):
        return False
    def program_args(self):
        return ['ps', 'aux']

class RunGroup(luigi.WrapperTask):
    def run(self):
        print("Running RunGroup")
    def requires(self):
        yield PS1()
        yield PS2()
    def output(self):
        return luigi.LocalTarget('/tmp/rg')
    def complete(self):
        tasks_complete = all(r.complete() for r in flatten(self.requires()))
        if tasks_complete:
            self.output().open('w').close()
            return True
        else:
            return False

if __name__ == '__main__':
    luigi.run(['RunGroup'])

I run it with the command: PYTHONPATH='' luigi --module RunGroup RunGroup --worker-keep-alive

It runs each ps command twice even though they complete successfully, then fails the WrapperTask

What am I missing?

erikbern commented 7 years ago

it's because you're using yield to generate dynamic requirements. that will cause the requires() function to run multiple times. in this case there's nothing dynamic about them, so just rewrite it to be

def requires(self):
    return [PS1(), PS2()]
Tarrasch commented 7 years ago

@erikbern, I think you confused yield-in-requires vs yield-in-run. See http://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies

@dletendre, your actual problem (I think) is that your complete() functions for PS1 and PS2 are incorrect stubs. They always are False so of course luigi will say that dependencies don't exist yet.

Unrelated sidenote: Remove the if __name__ == '__main__': part :)

dletendre commented 7 years ago

Thank you both! If I am running a large tree of dependent ExternalProgramTasks and WrapperTasks, is this the best way to set it up? I have a bunch of sqoop jobs that have to run and finish, then some hive jobs, then some spark-submit and sparkR jobs to run dependent after them. All of them are incremental jobs, so I run the whole tree every 5 or 10 minutes. Should I always use complete() return False for all these incremental jobs?

Tarrasch commented 7 years ago

Some of the technologies you use already have existing tasks. No need to use the rather low level ExternalProgramTask. As for setting up complete(), you usually don't set it (you rely on output()). Play around with the more basic examples first and read through some docs so you understand the output-complete relation. Then you're good to go. Good luck.