pyinvoke / invoke

Pythonic task management & command execution.
http://pyinvoke.org
BSD 2-Clause "Simplified" License
4.33k stars 366 forks source link

A lightweight recipe for running tasks asynchronously (suggestion) #531

Open honnibal opened 6 years ago

honnibal commented 6 years ago

Invoke is super awesome! Congrats :tada:

I see you've been debating designs for parallel execution and DAG structure pretty extensively. I'd like to suggest a pretty simple "userspace" strategy that I think can help, until a more full-featured solution lands in the library.

inv parallel sleep 2 sleep 1 block

Inside the parallel task, we set a runner that simply collects the commands to be run, and doesn't run anything. Then during block, we start them all, and wait until they're all complete.

For my purposes, I think this will usually be better than the make -j style. I'll usually want to parametrise the jobs I run in parallel, so I'd like them to be tasks. I'm also not that excited to have the library decide which parts should be parallel. I mean, sure that's good in theory --- but it's super difficult to get right. It's nice to have synchronous be the default.

Example implementation below. I don't think I understand how to pass state through the Context object properly yet. I'm sure there's a solution that's less hacky. It should also be simple to support nested parallelism.

'''Lightweight recipe for running tasks in parallel. 

Example:

  $ time inv parallel sleep 2 sleep 1 block
  inv parallel sleep 2 sleep 1 block  0.22s user 0.02s system 10% cpu 2.229 total  

All we do is replace the runner with a collection operation. Then during the "block" step, we
run the tasks and put everything back as it was. We need to gather the tasks because
each task assumes all pending tasks must stop before the task exits, which makes sense.
'''
class CollectTasks(object):
    def __init__(self, pending):
        self.pending = pending

    def run(self, *args, **kwargs):
        self.pending.append(('run', args, kwargs))
        return self

    def sudo(self, *args, **kwargs):
        self.pending.append(('sudo', args, kwargs))
        return self

@task
def parallel(c):
    pending = []
    c.config.runners.backup = c.config.runners.local
    def gather_task(context):
        runner = CollectTasks(pending)
        return runner
    c.config.runners.local = gather_task
    gather_task.pending = pending
    return c

@task
def block(c):
    pending = list(c.config.runners.local.pending)
    c.config.runners.local = c.config.runners.backup
    running = []
    for cmd, args, kwargs in pending:
        runner = c.config.runners.local(c)
        runner.using_pty = True
        runner.threads = {}
        runner.start(*args, shell='/bin/bash', env={}, **kwargs)
        running.append(runner)
    for task in running:
        task.wait()

@task
def sleep(c, n):
    c.run(f'sleep {n}')
bitprophet commented 6 years ago

At a high level I think this falls under another issue, allowing tasks to eagerly "eat" the entire rest of the parser token list, which maps most closely to #378 at the moment (specifically the *args sub-case). That enables things like what you're discussing here - having a single up-front task basically redefine parsing so it can do "meta" things with the rest of the command line.

The other component strikes me as concerning improvements around Executor and Context such that this type of userland code has an easier time recreating "most of" what the default execution loop does currently. Executor is a publicly documented/designed API but it's got some pending work to make it easier to truly reuse in ways that the internals had no need for when it was written.

Finally, I wonder how this might dovetail with another recent ticket, #526, since the entire process of "set up things to run, then run them async" seems to map super closely to asyncio's "generate coroutines, create a loop, feed coroutines to loop, tell loop to run somehow" workflow.

Don't have time right now to get deeper in this besides those thoughts, unfortunately!