dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 719 forks source link

Resource Manager Interface #128

Open mrocklin opened 8 years ago

mrocklin commented 8 years ago

Resource Manager Interface

The first step is hard, but standing up is harder.

Deploying distributed on a remote cluster is a large hurdle between the curious user and serious adoption. Efforts to improve deployment can significantly improve community uptake and value.

Unfortunately, there are a large number of deployment solutions so there is no single piece of code we can write to satisfy most potential users. We have direct relations with people who employ the following deployment technologies:

  1. YARN
  2. Mesos
  3. SLURM
  4. Torque
  5. SGE
  6. SSH
  7. Local processes

Supporting each of these is doable in isolation, but when we consider supporting all in aggregate we desire a common framework.

Additionally, near-future versions of the distributed scheduler may want to dynamically request and release worker resources depending on load. Providing a single resource management interface to the scheduler would allow integration with several technologies in a sane and maintainable manner.

Question: What does such a resource management interface look like?

Example Interface

Let us consider the following signals from scheduler to resource manager (RM) and back.

We could implement this as a Python ResourceManger object that has fuctions for "Please give me more workers" and "I no longer need these workers" that the scheduler can call as well as callbacks that the scheduler provides on ResourceManager creation for "I plan to take back this worker soon" and "this worker has unexpectedly died".

class ResourceManager(object):
    def __init__(self, on_reclaiming_worker, on_worker_died, **kwargs):
        ...

    def request_workers(self, nworkers, **kwargs):
        ...

    def release_workers(self, worker_ids):
        ...

We might then subclass this interface for processes, SSH, Mesos, Yarn, etc..

Questions

Different resource management systems employ different information. Yarn allows the specification of CPUs and Memory and the use of containers, HPC job schedulers have fixed job durations, Mesos operates a bit differently from all.

The example operations provided above may not fit all resource managers. Some may provide only a subset of this functionality while some may have a superset. How do we balance the needs to specialize to a particular system while also maintaining compatibility with many systems and keeping code simple.

cc @hussainsultan @broxtronix @quasiben @danielfrg

mrocklin commented 8 years ago

So questions all, are the four signals sufficient? Are there things that we're missing? What inputs do they need to take? How do individual systems differ?

ellisonbg commented 8 years ago

Subscribing to the discussion...

ellisonbg commented 8 years ago

What about Docker swarm?

mrocklin commented 8 years ago

Additionally we probably also want signals like

ogrisel commented 8 years ago

+1 for docker swarm & kubernetes. Both can be supported with the same code via the Apache Libcloud Container API along with Amazon ECS as well.

ogrisel commented 8 years ago

Support for SLURM and SGE can be implemented at once by reusing the lightweigth abstraction provided by https://github.com/clusterlib/clusterlib.

mrocklin commented 8 years ago

I'm slowly experimenting trying to find a good API and implementations to launch and monitor jobs over local processes, SSH, and Yarn. I should clean up this work and share it. I would also be quite happy for anyone else to take this on.

ogrisel commented 8 years ago

Once this RM API is implemented, it could be leveraged to implement dynamic / elastic scheduling where heuristics based on the length of the scheduler queue and mean task completion time could be used to automatically provision new workers when there is a lot of pending work and automatically release workers when they are idle for some time.

mrocklin commented 8 years ago

@ogrisel, @quasiben here is a rough and trivial implementation for local processs spawning for high-level API review.

https://github.com/mrocklin/spawner/blob/master/spawner/subprocess.py https://github.com/mrocklin/spawner/blob/master/spawner/tests/test_subprocess.py

I recommend starting with the LocalProcessSpawner docstring, followed by tests. Please ignore other files in this repository. They're from older experiments.

I hope none of the Jupyter folks mind me stealing the term spawner. Happy to change if there is conflict concern.

ogrisel commented 8 years ago

Alright, thanks.

ogrisel commented 8 years ago

So this spawner API would be called by the Resource Manager to launch new dworker process (that in turn can launch their own process children) is this right?

You would like one generic ResourceManager class that would delegate the spawning of the workers to platform specific Spawner implementations (for local process, ssh process, YARN, docker and SLURM/SGE) via composition?

ogrisel commented 8 years ago

BTW the shell=True stuff is unix specific. I think this part should be refactored to work on windows too.

mrocklin commented 8 years ago

Yes. The scheduler (or something closely attached to the scheduler) will contain logic to start and stop workers. I think that before this logic can exist we need a unified way of stopping/starting workers on various systems. If we can agree on an API that we can to implement over processes/ssh/sge/dockerspawner, then we can start using that API within the scheduler (or something closely attached) and start thinking about when we should start new jobs.

Alternatively, maybe this isn't the right place to generalize? Maybe the logic of when to start/stop new jobs also needs to be specific to the job-starting system in use. Thoughts?

ogrisel commented 8 years ago

Maybe the logic of when to start/stop new jobs also needs to be specific to the job-starting system in use.

I don't know yet. We have to try. Let's start by implementing the spawning API for a couple of backends and see how what are the constraints. I will start with SGE / SLURM.

SEJeff commented 8 years ago

May I suggest using drmaa for interacting with things such as SGE/UGE? My employer is a large HPC shop and everyone uses drmaa to submit compute jobs.

It works with:

For big HPC envs, drmaa (and its excellent python) are an industry standard.

@ogrisel ^^

pelson commented 7 years ago

Because this issue comes up a lot for me, it is worth noting that the state of play currently is that a distributed-drmaa interface exists at https://github.com/dask/dask-drmaa.

mrocklin commented 7 years ago

I would be curious to hear about your experience with dask-drmaa if any

On Mon, Apr 3, 2017 at 5:24 AM, Phil Elson notifications@github.com wrote:

Because this issue comes up a lot for me, it is worth noting that the state of play currently is that a distributed-drmaa interface exists at https://github.com/dask/dask-drmaa.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/128#issuecomment-291091686, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszPHymfL723Xdq7S2doMIvK5oVC7dks5rsLqwgaJpZM4HYd-V .