cylc / cylc-flow

Cylc: a workflow engine for cycling systems.
https://cylc.github.io
GNU General Public License v3.0
325 stars 90 forks source link

subprocpool: convert to async #5017

Open oliver-sanders opened 1 year ago

oliver-sanders commented 1 year ago

The subprocpool allows us to run subprocesses asynchronously in a limited pool to avoid overwhelming the scheduler host.

Proposed Change

Re-write the subprocpool as async code so we can await sub process calls from the code which issued them.

If required it would be possible to support both a call/callback and an async/await interface to the new pool.

I considered doing this during Cylc 8 development, however, at the time I figured this wasn't necessary and would bloat the project. In retrospect it would probably have saved time...

Why

The subprocpool was implemented in Cylc 7 using a call/callback pattern. Code that works with the subprocpool must issue commands, then, in a future main loop iteration check to see whether the callback has been fired. This is a slightly icky pattern but it's worked ok, up until we needed to add remote-init/remote-fileinstall/platform-selection/host-selection into the job submission pipeline.

Conceptually this is simple, here's an outline of how the job-submission / intelligent host-selection works:

async def submit(itask, bad_hosts):
    platform_string = itask.taskdef.rtconfig['platform']
    for platform in select_platform(platform_string):
        for host in select_host(platform, bad_hosts):
            ret = await remote_init(platform, host)
            if ret.return_code == 255:
                bad_hosts.add(host)
                continue
            ret = await remote_fileinstall(platform, host)
            if ret.return_code == 255:
                bad_hosts.add(host)
                continue
            await check_syntax(itask, platform, host)
            ret = await submit_job(itask, platform, host)
            if ret.return_code == 255:
                bad_hosts.add(host)
                continue
            break
    else:
        raise PlatformError(f'no hosts available for {platform}')

But when you try to write this using a call/callback pattern it gets messy. The actual code is much, much longer and spread between multiple functions (which issue the calls) and callbacks (which update the shared state).

(Minor sidenote it also separates you from the subprocess.Popen object which would be really handy to have).

How

The async/await interfaces absorb the call/callback and state management aspects by abstracting them away.

This leaves us to write the business logic which is fairly straight forward, here's an example implementation:

import asyncio                                                                       
from subprocess import Popen

class ProcItem:

    def __init__(self, args, kwargs):
        self.args = args
        self.kwargs = kwargs
        self.future = asyncio.Future()
        self.proc = None

class ProcPool:

    def __init__(self, max_size=5):
        self._queued = asyncio.Queue()
        self._running = set()
        self.max_size = max_size
        self._slots = asyncio.Queue()

    async def run(self, *args, **kwargs):
        """Run a new process."""
        item = ProcItem(args, kwargs)
        await self._queued.put(item)
        await item.future
        return item.proc

    async def runner(self):
        """Event driven process submitter."""
        for _ in range(self.max_size):
            await self._slots.put(True)
        while True:
            # claim a slot in the pool
            await self._slots.get()
            # wait for an item to be queued
            item = await self._queued.get()
            print(f'$ {item.args}')
            item.proc = Popen(*item.args, **item.kwargs)
            self._running.add(item)

    async def poller(self):
        """Process poller."""
        while True:
            await asyncio.sleep(1)
            for item in set(self._running):
                if item.proc.poll() is not None:
                    item.future.set_result(item.proc.returncode)
                    self._running.remove(item)
                    await self._slots.put(True)

async def test():
    from subprocess import PIPE

    async def _run(pool, world):
        proc = await pool.run(['echo', 'hello', world], stdout=PIPE)
        print(proc.communicate())

    pool = ProcPool(2)
    await asyncio.gather(
        pool.poller(),
        pool.runner(),
        _run(pool, 'earth'),
        _run(pool, 'mars'),
        _run(pool, 'jupyter'),
        _run(pool, 'saturn'),
        _run(pool, 'pluto'),
    )

asyncio.run(test())  

Long Term Context

This would help us to break the main loop lockstep: https://github.com/cylc/cylc-flow/issues/3498

Submissions would go through faster and the load/complexity of the main loop would be reduced.

Decoupling job submission from the main loop would simplify the logic but also provide us with a lot more flexility over how we run job submission. E.G. we could push it out into another process or (using zmq queues) even push it onto another host (lightweight submission on another host controlled by the remote scheduler, i.e. very interesting cloud possibilities).

Pull requests welcome!

hjoliver commented 1 year ago

:tada: yes this would be a great improvement!

(We did not have the option to use asyncio when the current system was devised, of course).

oliver-sanders commented 1 year ago

I had a look at this today as a training exercise, conclusions:

Here's how the top-level code would look after this change:

async def submitter(bad_hosts, submission_queue, subprocpool):
    """Job submitter thinggy.

    When you want jobs to be submitted, push the corresponding tasks
    into the submission_queue and lean back, the submitter does the work
    for you.
    """
    cache = SelectCache()
    while True:
        for itask in submission_queue.get()
            try:
                await _submit(cache, itask, bad_hosts, subprocpool)
            except (JobSyntaxError, PlatformError, SubmissionError):
                # TODO: => submit-fail
            else:
                # TODO: => submitted

async def _submit(cache, itask, bad_hosts, subprocpool):
    """The job submission pipeline for a single task."""
    rtconfig = _get_rtconfig(itask, broadcast_mgr)
    select_host = await _get_host_host_selector(cache, itask, rtconfig, bad_hosts, subprocpool)
    for platform, host in select_host:
        try:
            await check_syntax(itask, platform, host)
            await remote_init(cache, platform, host)
            await remote_file_install(cache, platform, host)
            await submit_job(cache, itask, platform, host)
            break
        except SSHError:
            # LOG.warning()
            bad_hosts.add(host)
            continue
    else:
        raise PlatformError(f'no hosts available for {platform}')

We can keep batching behaviour by maintaining mappings as we currently do. E.G. the remote-init map logic can be handled like this (note that the caching part of the logic has been removed from the actual implementation of remote-init itself):

async def remote_init(cache, platform, host):                                                                     
     with suppress(KeyError):                                                                                                 
         return await cache.remote_init_cache[platform['install target']]
     coro = _remote_init(platform, host)                                                                            
     cache.remote_init_cache[platform['install target']] = coro
     return await coro      

Batching the job-submit commands is a little funkier, but I think we might be able to do it something like this:

async def submit_job(cache, itask, platform, host):
    """Run the job submission command."""
    key = (platform['name'], host)
    with suppress(KeyError):
        cache.job_submission_queue.append(itask)
        return await cache.job_submission_queue[key]
    cache.job_submission_queue[key] = [itask]

    # wait for other submissions to queue up behind us
    await asyncio.sleep(0)

    # submit the batch
    _submit_jobs(cache.job_submission_queue, platform, host)

So that we can continue to write the code from the perspective of a single job submission, rather than having to define this batching in the top-level code.

Code ```python import asyncio from contextlib import suppress import os import os.path import re import typing as ty from weakref import WeakValueDictionary from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.parsec.util import ( pdeepcopy, poverride, ) from cylc.flow.platforms import ( HOST_REC_COMMAND, PLATFORM_REC_COMMAND, platform_name_from_job_info2, ) from cylc.flow.remote import ( is_remote_host, ) from cylc.flow.submit.remote import ( remote_init as _remote_init, remote_file_install as _remote_file_install, ) class SelectCache(): def __init__(self): self.remote_init_cache = {} self.remote_file_install_cache = {} self.host_selection_batches = WeakValueDictionary() self.job_submission_queue = {} self.platforms = glbl_cfg().get(['platforms']), async def remote_init(cache, platform, host): with suppress(KeyError): return await cache.remote_init_cache[platform['install target']] coro = _remote_init(platform, host) cache.remote_init_cache[platform['install target']] = coro return await coro async def remote_file_install(cache, platform, host): with suppress(KeyError): return await cache.remote_file_install_cache[ platform['install target'] ] coro = _remote_file_install(platform, host) cache.remote_file_install_cache[platform['install target']] = coro return await coro async def submit_job(cache, itask, platform, host): """Run the job submission command.""" key = (platform['name'], host) with suppress(KeyError): cache.job_submission_queue.append(itask) return await cache.job_submission_queue[key] cache.job_submission_queue[key] = [itask] # wait for other submissions to queue up behind us await asyncio.sleep(0) # submit the batch _submit_jobs(cache.job_submission_queue, platform, host) def _get_rtconfig(itask, broadcast_mgr): """Return the runtime config for a task.""" # Handle broadcasts overrides = broadcast_mgr.get_broadcast( itask.tokens ) if overrides: rtconfig = pdeepcopy(itask.tdef.rtconfig) poverride(rtconfig, overrides, prepend=True) else: rtconfig = itask.tdef.rtconfig return rtconfig def _host_selector(platform_string, bad_hosts): """Yields hosts for a platform string. Note that platform_string could be a platform_group. """ for platform in get_platform(platform_string): for host in get_host(platform, bad_hosts): yield (platform, host) async def eval_host(host_str: str, subprocpool) -> ty.Optional[str]: """Evaluate a host from a possible subshell string. Args: host_str: An explicit host name, a command in back-tick or $(command) format, or an environment variable holding a hostname. Returns 'localhost' if evaluated name is equivalent (e.g. localhost4.localdomain4). """ host = await subshell_eval(host_str, HOST_REC_COMMAND, subprocpool) if host is not None and not is_remote_host(host): return 'localhost' return host async def eval_platform(platform_str: str, subprocpool) -> ty.Optional[str]: """Evaluate a platform from a possible subshell string. Args: platform_str: An explicit platform name, a command in $(command) format, or an environment variable holding a platform name. """ return await subshell_eval(platform_str, PLATFORM_REC_COMMAND, subprocpool) async def subshell_eval( eval_str: str, command_pattern: re.Pattern, subprocpool, ) -> ty.Optional[str]: """Evaluate a platform or host from a possible subshell string. Arguments: eval_str: An explicit host/platform name, a command, or an environment variable holding a host/patform name. command_pattern: A compiled regex pattern designed to match subshell strings. Return: - None if evaluation of command is still taking place. - 'localhost' if string is empty/not defined. - Otherwise, return the evaluated host/platform name on success. Raise PlatformError on error. """ if not eval_str: return 'localhost' # Host selection command: $(command) or `command` match_ = command_pattern.match(eval_str) if match_: eval_str = await subprocpool.run( ['bash', '-c', match_.groups()[1]], env=dict(os.environ) ) # Environment variable substitution return os.path.expandvars(eval_str) async def _get_host_host_selector(cache, itask, rtconfig, bad_hosts, subprocpool): """"Return a generator which picks hosts a task could submit to.""" # get the platform expression platform_expr = rtconfig['platform'] host_expr = rtconfig['remote']['host'] if platform_expr and host_expr: raise WorkflowConfigError( "A mixture of Cylc 7 (host) and Cylc 8 (platform)" " logic should not be used. In this case for the task " f"\"{itask.identity}\" the following are not compatible:\n" ) # check whether we are currently submitting other jobs for this expression key = (platform_expr, host_expr) with suppress(KeyError): return cache.host_selection_batches[key] # evaludate the platform/host expression if host_expr: host_string = await eval_host(host_expr, subprocpool) platform_string = platform_name_from_job_info2( cache.platforms, host_string, rtconfig, ) else: platform_string = await eval_platform(platform_expr, subprocpool) # return the host selector sel = _host_selector(platform_string, bad_hosts) # TODO: cache the PlatformError cache.host_selection_batches[key] = sel return sel async def submitter(bad_hosts, submission_queue, subprocpool): """Job submitter thinggy. When you want jobs to be submitted, push the corresponding tasks into the submission_queue and lean back, the submitter does the work for you. """ cache = SelectCache() while True: for itask in submission_queue.get() try: await _submit(cache, itask, bad_hosts, subprocpool) except JobSyntaxError: pass # submit-fail except PlatformError: pass # submit-fail except SubmissionError: pass # submit-fail else: pass # submitted async def _submit(cache, itask, bad_hosts, subprocpool): """The job submission pipeline for a single task.""" rtconfig = _get_rtconfig(itask, broadcast_mgr) select_host = await _get_host_host_selector(cache, itask, rtconfig, bad_hosts, subprocpool) for platform, host in select_host: try: await check_syntax(itask, platform, host) await remote_init(cache, platform, host) await remote_file_install(cache, platform, host) await submit_job(cache, itask, platform, host) break except SSHError: # LOG.warning() bad_hosts.add(host) continue else: raise PlatformError(f'no hosts available for {platform}') ```

https://github.com/oliver-sanders/cylc-flow/pull/new/async-subproc-pool-early-experiment

hjoliver commented 1 year ago

Very nice!