spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.71k stars 2.39k forks source link

Fix execution summary for optional parameters #3286

Closed starhel closed 4 months ago

starhel commented 4 months ago

Description

If an optional parameter has its own next_in_enumeration method, generating a summary ends in an error if the same task has been run both with a value of None and with a proper value for the parameter.

Motivation and Context

import luigi

class A(luigi.ExternalTask):
    def output(self):
        return luigi.LocalTarget("a.txt")

class RecB(luigi.Task):
    step = luigi.OptionalIntParameter(default=None)

    def requires(self):
        step = 3 if self.step is None else self.step
        if step == 0:
            yield self.clone(A)
        else:
            yield self.clone(step=step-1)

    def run(self):
        with self.output().open("w") as f:
            f.write("1")

    def output(self):
        return luigi.LocalTarget(f"b_{self.step}.txt")

luigi.build([RecB()])
Traceback (most recent call last):
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/bug.py", line 28, in <module>
    luigi.build([RecB()])
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/venv/lib/python3.10/site-packages/luigi/interface.py", line 243, in build
    luigi_run_result = _schedule_and_run(tasks, worker_scheduler_factory, override_defaults=env_params)
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/venv/lib/python3.10/site-packages/luigi/interface.py", line 178, in _schedule_and_run
    luigi_run_result = LuigiRunResult(worker, success)
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/venv/lib/python3.10/site-packages/luigi/execution_summary.py", line 79, in __init__
    self.summary_text = _summary_wrap(_summary_format(summary_dict, worker))
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/venv/lib/python3.10/site-packages/luigi/execution_summary.py", line 414, in _summary_format
    str_output += '{0}\n'.format(_get_str(group_tasks[status], status in _PENDING_SUB_STATUSES))
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/venv/lib/python3.10/site-packages/luigi/execution_summary.py", line 196, in _get_str
    first, last = _ranging_attributes(attributes, param_class)
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/venv/lib/python3.10/site-packages/luigi/execution_summary.py", line 261, in _ranging_attributes
    next_attributes = {param_class.next_in_enumeration(attribute) for attribute in attributes}
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/venv/lib/python3.10/site-packages/luigi/execution_summary.py", line 261, in <setcomp>
    next_attributes = {param_class.next_in_enumeration(attribute) for attribute in attributes}
  File "/home/astachlewski/tmp/luigi_bug_recursive_graph/venv/lib/python3.10/site-packages/luigi/parameter.py", line 686, in next_in_enumeration
    return value + 1
TypeError: unsupported operand type(s) for +: 'NoneType' and 'int'

Have you tested this? If so, how?

Automated tests as well as running the above code.

starhel commented 4 months ago

I've also fixed some broken test for jsonschema (as order of validation is changed in newest version).