RhodiumGroup / rhg_compute_tools

Tools for using compute.rhg.com and compute.impactlab.org
MIT License
1 stars 4 forks source link

add cluster spinup & job management utilities #53

Open delgadom opened 5 years ago

delgadom commented 5 years ago

Cluster spinup

Spin up for a cluster and (optionally) wait for workers to appear, with a progress bar. Optionally use as a context manager to spin down cluster after job execution.

try:
    from tqdm.auto import tqdm
except ImportError:
    from tqdm import tqdm_notebook as tqdm

class setup_cluster(object):
    '''
    Scales up a dask cluster, with the option to block until the workers are available

    If ``setup_cluster`` is used as a context manager, the workers will be spun down
    upon exit. Note that this does not automatically wait for all tasks to be
    completed, so care should be taken to ensure all jobs will block until the context
    is exited. Because the workers will be spun down after job completion,
    debugging tasks can be difficult. Therefore, it's recommended that this not be
    used for prototyping, and that tasks have their own mechanism for error
    handling and possibly reporting when using ``setup_cluster`` this way.

    Parameters
    ----------
    nworkers : int
        Number of workers to create
    cluster_creator : func, optional
        Cluster creation function. Object returned must have ``scale`` and ``close``
        methods. Default is :py:func:`rhg_compute_tools.kubernetes.get_cluster`.
    cluster_kwargs : dict, optional
        Keyword arguments passed to ``cluster_creator``. Default ``{}``
    block : bool, optional
        Whether to block until all workers have come online. Default ``True``.
    pbar : bool, optional
        Whether to create a tqdm progress bar displaying worker spinup. Ignored
        if ``block`` is ``False``. Default ``True``.
    pbar_kwargs : dict, optional
        Keyword arguments passed to :py:func:`tqdm.auto.tqdm`

    Examples
    --------
    Can be used as a helper to scale up workers:

    .. code-block:: python

        >>> s = setup_cluster(10)
        >>> client, cluster = s.scale_and_wait_for_workers()  # doctest: +SKIP
        100%|██████████| 10/10 [00:12<00:00,  21.72s/it]
        >>> futures = client.map(lambda x: x**2, range(20))  # doctest: +SKIP

    Alternatively, can be used as a context manager:

    .. code-block:: python

        >>> with setup_cluster(10, pbar_kwargs={'desc': 'workers'}) as client, cluster:
        ...     futures = client.map(lambda x: x**2, range(20))
        ...     wait_for_futures(
        ...         futures,
        ...         pbar_kwargs={'desc': 'jobs'})  # doctest: +SKIP
        ...
        workers: 100%|██████████| 10/10 [00:12<00:00,  1.20s/it]
        jobs: 100%|██████████| 10/10 [00:01<00:00,  9.83it/s]
    '''
    def __init__(
            self,
            nworkers,
            cluster_creator=None,
            cluster_kwargs=None,
            block=True,
            pbar=True,
            pbar_kwargs=None):
        self.nworkers = nworkers
        self.cluster_creator = (
            cluster_creator if cluster_creator is not None else rhgk.get_cluster)
        self.cluster_kwargs = cluster_kwargs if cluster_kwargs is not None else {}
        self.block = block
        self.pbar = pbar
        self.pbar_kwargs = None

    def scale_and_wait_for_workers(self):
        self.client, self.cluster = self.cluster_creator(**self.cluster_args)
        self.cluster.scale(self.nworkers)

        if self.block and self.pbar:
            pbar = tqdm.tqdm_notebook(total=self.nworkers, desc='workers')

            while True:
                nworkers = len(self.client.ncores().values())
                pbar.n = nworkers
                pbar.refresh()
                if nworkers < self.nworkers:
                    time.sleep(0.2)
                else:
                    pbar.n = nworkers
                    pbar.refresh()
                    break

        return self.client, self.cluster

    def __enter__(self):
        return self.scale_and_wait_for_workers()

    def __exit__(self, *args, **kwargs):
        self.cluster.scale(0)
        self.client.close()
        self.cluster.close()

Task management

Wait for futures to complete, with a progress bar

def wait_for_futures(futures, pbar_kwargs=None):
    '''
    Blocking progress bar for dask futures

    Provides a progress bar which will block the python interpreter until
    all futures are completed

    Parameters
    ----------
    futures : list or dask object
        list of dask futures objects, or a dask collection such as a
        Dataframe or Array object with a dask attribute
    kwargs:
        Keyword arguments passed to tqdm.auto.tqdm constructor
    '''
    if pbar_kwargs is None:
        pbar_kwargs = {}

    if hasattr(futures, 'dask'):
        futures = futures.dask.values()

    pbar = tqdm(dd.as_completed(futures), total=len(futures), **pbar_kwargs)

    errors = 0
    for f in pbar:
        if f.status == 'error':
            errors += 1
            pbar.set_postfix({'errors': errors})

Other useful features

bolliger32 commented 5 years ago

Another useful feature:

[ ] a wrapper that handles the IOErrors that we sometimes get with large worker numbers and keeps submitting the function until it succeeds or reaches a Max retry num

I feel like some of this stuff could be developed on dask_kubernetes but maybe easier to get it up and running here and then see if it can be merged

delgadom commented 5 years ago

yeah. this is just like a helper function that handles errors that frequently pop up in the chaos of crapton-of-workers land and then hammers the jobs until they complete?

I've actually found the cluster to be much more stable, even when running huge numbers of jobs. Have you encountered this recently?

bolliger32 commented 5 years ago

nice! I haven't run a huge number of jobs in a long time (like since BR1 push). But yeah that's what I was thinking. I remember the IOError being the main issue. If we start experiencing this again, we can try to build in something like that maybe