Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
It is possbile that the outputs of a task don't exist but but still assigned as the status DONE.
Here is an example:
import luigi
class Task1(luigi.Task):
def run(self):
with self.output()["foo_TASK_1"].open("w") as f:
f.write("Hello World")
with self.output()["bar_TASK_1"].open("w") as f:
f.write("Hello World")
def output(self):
return {
"foo_TASK_1": luigi.LocalTarget("foo_TASK_1.txt"),
"bar_TASK_1": luigi.LocalTarget("bar_TASK_1.txt"),
}
class Task2(luigi.Task):
def requires(self):
return Task1()
def run(self):
with self.output()["foo_TASK_2"].open("w") as f:
f.write("Hello World")
def output(self):
return {
"foo_TASK_2": luigi.LocalTarget("foo_TASK_2.txt"),
"bar_TASK_2": luigi.LocalTarget("bar_TASK_2.txt"),
}
if __name__ == "__main__":
luigi.build([Task2()], local_scheduler=True, detailed_summary=True)
If you run this the detailed summary will show that both tasks are successful. However, Task2 cannot be successful because in the run() we only create one of the outputs.
I've traced this error to the worker.py and fixed it in a hacky way.
Fixing the faulty DONE status assignment
It is possbile that the outputs of a task don't exist but but still assigned as the status DONE.
Here is an example:
If you run this the detailed summary will show that both tasks are successful. However, Task2 cannot be successful because in the run() we only create one of the outputs.
I've traced this error to the worker.py and fixed it in a hacky way.