pyinvoke / invoke

Pythonic task management & command execution.
http://pyinvoke.org
BSD 2-Clause "Simplified" License
4.41k stars 369 forks source link

Repeated post tasks aren't correctly ordered #1006

Open NotizMe opened 2 months ago

NotizMe commented 2 months ago

Description

When I set a post task on multiple tasks Invoke correctly only executes it once. But it is executed after the first task that has it as a post task, instead of waiting until the last task is done which has the task as post task.

Code

import invoke

@invoke.task
def post_task(_: invoke.Context) -> None:
    print('The last task')

@invoke.task(post=[post_task])
def pre_task_1(_: invoke.Context) -> None:
    print('The first pre task')

@invoke.task(post=[post_task])
def pre_task_2(_: invoke.Context) -> None:
    print('The second pre task')

@invoke.task(default=True, pre=[pre_task_1, pre_task_2], post=[post_task])
def main(ctx: invoke.Context) -> None:
    print('The main task')

Expected output

The first pre task
The second pre task
The main task
The last task

Output result

The first pre task
The last task
The second pre task
The main task

Additional information

yolabingo commented 2 months ago

maybe modifying Invoke along these lines would work?


class Executor:
    def __init__(self, collection, config=None):
        self.collection = collection
        self.config = config
        self.post_tasks = set()  # Set to collect post-tasks
        self.max_recursion_depth = 4  # Set the maximum recursion depth

    def _execute(self, task, context, args, kwargs, depth=0):
        # Check recursion depth
        if depth > self.max_recursion_depth:
            raise RecursionError("Maximum recursion depth reached")

        # Handle pre-tasks (if any) recursively
        if task.pre:
            for pre_task_name in task.pre:
                pre_task = self.collection[pre_task_name]
                self._execute(pre_task, context, args, kwargs, depth=depth+1)

        # Run the main task
        print(f"Executing task: {task.name}")
        task(context, *args, **kwargs)

        # Collect post-tasks to run later, ensuring no duplicates
        if task.post:
            for post_task_name in task.post:
                if post_task_name not in self.post_tasks:
                    self.post_tasks.add(post_task_name)

    def execute(self, task_name, *args, **kwargs):
        task = self.collection[task_name]
        context = kwargs.pop('ctx', None)

        # Execute the task and its pre-tasks recursively
        self._execute(task, context, args, kwargs)

        # Now run all collected post-tasks
        for post_task_name in self.post_tasks:
            post_task = self.collection[post_task_name]
            print(f"Executing post-task: {post_task.name}")
            self._execute(post_task, context, args, kwargs, depth=0)