spotify / luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
Apache License 2.0
17.87k stars 2.4k forks source link

Priority sometimes not correctly set #3075

Open smarwei opened 3 years ago

smarwei commented 3 years ago

In the following code all ChildTasks priority is set to their tId, but it actually only works for the ChildTasks between ParentTask with TaskNr 0 and 1. All ChildTasks between ParentTask 1 and 2 have priority of 9 although theri tId is set correctly.

The scheduler is showing the following:

  Name Details Priority Time
DONE ChildTask tId=9, requiresTask=ParentTask2 9 4/16/2021, 8:43:41 AM  
DONE ChildTask tId=8, requiresTask=ParentTask2 9 4/16/2021, 8:43:51 AM
DONE ChildTask tId=2, requiresTask=ParentTask2 9 4/16/2021, 8:44:01 AM  
DONE ChildTask tId=5, requiresTask=ParentTask2 9 4/16/2021, 8:44:11 AM  
DONE ChildTask tId=3, requiresTask=ParentTask2 9 4/16/2021, 8:44:21 AM  
DONE ChildTask tId=6, requiresTask=ParentTask2 9 4/16/2021, 8:44:31 AM  
DONE ChildTask tId=4, requiresTask=ParentTask2 9 4/16/2021, 8:44:41 AM  
DONE ChildTask tId=0, requiresTask=ParentTask2 9 4/16/2021, 8:44:52 AM  
DONE ChildTask tId=1, requiresTask=ParentTask2 9 4/16/2021, 8:45:02 AM  
DONE ChildTask tId=7, requiresTask=ParentTask2 9 4/16/2021, 8:45:12 AM  
DONE ChildTask tId=9, requiresTask=ParentTask1 9 4/16/2021, 8:45:32 AM  
DONE ChildTask tId=8, requiresTask=ParentTask1 8 4/16/2021, 8:45:42 AM  
DONE ChildTask tId=7, requiresTask=ParentTask1 7 4/16/2021, 8:45:52 AM  
DONE ChildTask tId=6, requiresTask=ParentTask1 6 4/16/2021, 8:46:02 AM  
DONE ChildTask tId=5, requiresTask=ParentTask1 5 4/16/2021, 8:46:13 AM  
DONE ChildTask tId=4, requiresTask=ParentTask1 4 4/16/2021, 8:46:23 AM  
DONE ChildTask tId=3, requiresTask=ParentTask1 3 4/16/2021, 8:46:33 AM  
DONE ChildTask tId=2, requiresTask=ParentTask1 2 4/16/2021, 8:46:43 AM  
DONE ChildTask tId=1, requiresTask=ParentTask1 1 4/16/2021, 8:46:53 AM  
DONE ChildTask tId=0, requiresTask=ParentTask1 0 4/16/2021, 8:47:03 AM
import time
import luigi

metadataTable = {}

class MetadataTableTarget:
    def __init__(self, taskType, tId):
        self.metadata_id = f'({taskType}, {tId})'

    def exists(self):
        global metadataTable
        return self.metadata_id in metadataTable and metadataTable[self.metadata_id] == "finished"

    def set_waiting(self):
        global metadataTable
        metadataTable[self.metadata_id] = "waiting"

    def set_running(self):
        global metadataTable
        metadataTable[self.metadata_id] = "running"

    def set_finished(self):
        global metadataTable
        metadataTable[self.metadata_id] = "finished"

class ChildTask(luigi.Task):
    tId = luigi.IntParameter()
    requiresTask = luigi.Parameter()

    @property
    def priority(self):
        return self.tId

    def requires(self):
        self.output().set_waiting()
        if self.requiresTask == "ParentTask1":
            return ParentTask(1)
        else:
            return ParentTask(2)

    def run(self):
        self.output().set_running()
        time.sleep(10)
        print("SUCCESS")
        self.output().set_finished()

    def output(self):
        return MetadataTableTarget("ChildTask", self.tId)

class ParentTask(luigi.Task):
    taskNr = luigi.IntParameter()

    def requires(self):
        self.output().set_waiting()
        tasks = []

        for i in range(0, 10):
            if self.taskNr == 0:
                tasks.append( ChildTask(i, "ParentTask1") )
            elif self.taskNr == 1:
                tasks.append( ChildTask(i, "ParentTask2") )

        return tasks

    def run(self):
        self.output().set_running()
        time.sleep(10)
        print("ParentTask1 finished")
        self.output().set_finished()

    def output(self):
        return MetadataTableTarget("ParentTask", 0)

if __name__ == '__main__':
    r = luigi.build([ParentTask(0)],
        log_level='WARNING',
        local_scheduler=False,
        workers=1,
        detailed_summary=True
    )
stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If closed, you may revisit when your time allows and reopen! Thank you for your contributions.

indeedael commented 2 years ago

I think I'm having the same problem. Is this really a bug that won't be fixed or are priorities intended to be used differently?