spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.71k stars 2.39k forks source link

Faulty DONE status assignment #3273

Closed HadiKutabi closed 8 months ago

HadiKutabi commented 8 months ago

It is possible that the outputs of a task don't exist but but still assigned as the status DONE.

Here is an example:

import luigi

class Task1(luigi.Task):

    def run(self):
        with self.output()["foo_TASK_1"].open("w") as f:
            f.write("Hello World")

        with self.output()["bar_TASK_1"].open("w") as f:
            f.write("Hello World")

    def output(self):
        return {
            "foo_TASK_1": luigi.LocalTarget("foo_TASK_1.txt"),
            "bar_TASK_1": luigi.LocalTarget("bar_TASK_1.txt"),
        }

class Task2(luigi.Task):
    def requires(self):
        return Task1()

    def run(self):
        with self.output()["foo_TASK_2"].open("w") as f:
            f.write("Hello World")

    def output(self):
        return {
            "foo_TASK_2": luigi.LocalTarget("foo_TASK_2.txt"),
            "bar_TASK_2": luigi.LocalTarget("bar_TASK_2.txt"),
        }

if __name__ == "__main__":
    luigi.build([Task2()], local_scheduler=True, detailed_summary=True)

If you run this the detailed summary will show that both tasks are successful. However, Task2 cannot be successful because in the run() we only create one of the outputs.

I've traced this error to the worker.py (line 216) and fixed it in a hacky way as follows:

                        # update the cache
                        if self.task_completion_cache is not None:
                            self.task_completion_cache[self.task.task_id] = True
                        status = DONE if self.task.complete() else FAILED
                    elif self.check_complete(self.task):
                        status = DONE
                    else:

Can someone explain to me if my solution makes sense? or why luigi thinks that the task is successful?

Thanks :)

lallea commented 8 months ago

The developer is responsible for ensuring that Task.run creates all outputs. It is documented here: https://github.com/spotify/luigi/blob/master/doc/tasks.rst?plain=1#L158

I suggest closing this issue, since Luigi works as documented.

RRap0so commented 8 months ago

Working as intended. Thank you @lallea