dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

Have workers query service for scheduler address #4715

Open mrocklin opened 3 years ago

mrocklin commented 3 years ago

Currently a worker needs a scheduler address in order to start. This can cause issues in two situations:

  1. If the scheduler and workers are started at the same time then it can take twice as long to start a meaningful dask cluster than it takes to start a process/vm/pod/... (see https://github.com/dask/distributed/pull/4710#issuecomment-822374645)
  2. If the scheduler goes down and comes up someplace else then the workers need to be redirected

One exception to this situation is the scheduler_file file, which uses a file system as a coordination point between a not-yet-created scheduler and a pool of not-yet-created workers. They all check that file periodically and once the scheduler arrives and writes to it everyone knows where to connect.

We might consider something similar with a web service, where the workers probe a service to find out where they should connect

dask-worker --scheduler-service https://where-is-my-scheduler.com/cluster-id/1234

This would require some sort of basic protocol to be established. (probably simpler than Dask comms, maybe normal web request/response). It would also require us to modify the current logic in the Worker and Nanny on reconnection to the scheduler. I imagine that this conversation would probably look like the following:

This would probably be useful for systems like Dask-Gateway and certainly Coiled. I'm curious if it could be made useful for other systems like dask-cloudprovider. cc @jacobtomlinson @selshowk

This also looks like a reinvention of zookeeper I think

mrocklin commented 3 years ago

I'm curious if there is some generic thing that we could do here that would easily plug into other systems, like Zookeeper, or other cloud native solutions. A web route is the simplest to invent I think, but maybe there are more tailored solutions that already exist that we would want to support in a more pluggable way.

jacobtomlinson commented 3 years ago

I am very much in support of this.

Tools like zookeeper, etcd and consul do this today and are popular in many other projects. I think Spark folks tend to use zookeeper. Kubernetes nodes commonly use etcd.

jacobtomlinson commented 3 years ago

I am hesitant to invent yet-another-raft-election-tool.

mrocklin commented 3 years ago

I am hesitant to invent yet-another-raft-election-tool.

Yeah, agreed. This should be an interface to rely on existing tools. Do you have thoughts on what that interface should look like?

fjetter commented 3 years ago

For reference, the kazoo (zookeeper python) interface for leader election is very simple https://kazoo.readthedocs.io/en/latest/api/recipe/election.html and a more thorough description on how this works in the background using ephemeral zookeeper nodes can be found here.

afaik, etcd doesn't offer this as a ready-to-use recipe but the concepts are fairly similar that the kazoo implementation could be used as a reference, I guess.

mrocklin commented 3 years ago

My sense here is that we would first want to establish an interface class (similar to Cluster) and then leave it to others to implement that interface for various technical options that exist. Coiled would love something like this FWIW. The biggest pain point in our stack right now that this would help to solve is startup time. The interface that I would want here is something relatively simple like the following:

class Coordinator:
    def __init__(self, address):
        ...
    async def scheduler(self) -> str:
        """ returns the address of a running scheduler once available """

When we think of things like leadership election though then this gets more complicated. I'm curious if there is a way to design this incrementally. I'd be inclined to propose something like the above as a PR and then ask folks like @jacobtomlinson to make sure that it doesn't block us off from other more exciting paths in the future.

jacobtomlinson commented 3 years ago

My sense here is that we would first want to establish an interface class (similar to Cluster) and then leave it to others to implement that interface for various technical options that exist.

This is what I was trying to do with Runner in #4710

mrocklin commented 3 years ago

I see Runner as a combination between stuff-currently-in-Cluster and things-that-look-like-zookeeper.

I'm curious if we can keep these two abstractions separate, and so maybe more composable. That may or may not make sense though. I look forward to discussion

mrocklin commented 3 years ago

In a conversation with @jacobtomlinson one thing that came up was that the scheduler_file= functionality already works like this, and could be another example of such an abstraction.

We could also extend that functionality to have workers select a scheduler if we wanted to extend things to high-availability. This would probably require using lockfiles (not always present on NFS systems) and teaching workers to become schedulers.

jacobtomlinson commented 3 years ago

I think @fjetter put it nicely in https://github.com/dask/distributed/pull/4710#discussion_r617724084.

Conceptually I see the Cluster as the entity which is allowed, or even required, to talk to the hardware or resource manager (trying to avoid the "cluster" term here). It will talk to this hardware manager via some kind of API to spawn new ServerNode instances of type {Scheduler|Worker} somewhere else (different process, different VM, ...) while the Runner will spawn one ServerNode next to it, similar to what the Nanny does with the Worker.

The runner does behave much like Nanny except that it will interface with some service like zookeeper to figure out which server process it should be launching.

I like that it looks very much like Cluster, it feels familiar and works nicely within context managers. As @fjetter pointed out though the similarity also added some confusion.

scheduler_file= functionality already works like this, and could be another example of such an abstraction.

I've been thinking more on this. Scheduler file sort of works like this. You start the scheduler and it writes its info to a file. You start workers and point them to a file and they block until that file contains the info they are looking for.

However you are still explicitly telling each process what server type it should be.

As you say we could launch all processes with the same command and whichever process grabs the lock on the file first becomes the scheduler. But relying on filesystem locks does make me a little nervous.

jacobtomlinson commented 3 years ago

We could also explore having each process work out among themselves who is in charge. Perhaps when processes start they run an RPC server and open comms in order to negotiate who is leader.

It would rely on all processes being able to discover a list of the other processes.

Unless we fully implement some RAFT system this is likely to not scale well, but is a simpler case than having to set up something like zookeeper. Perhaps that is acceptable and once folks want to scale beyond a certain point it is recommended they use a third-party distributed key-value store.

@quasiben had some thoughts on this too.

fjetter commented 3 years ago

Unless we fully implement some RAFT system

Please don't XD

mrocklin commented 3 years ago

However you are still explicitly telling each process what server type it should be.

As you say we could launch all processes with the same command and whichever process grabs the lock on the file first becomes the scheduler. But relying on filesystem locks does make me a little nervous.

Yeah, I agree that I'm jumping ahead a bit here. Right now I want an agreed-upon abstraction in Dask which allows workers to query a thing and get the address of the scheduler. I think that that abstraction should be able to grow to support things like figuring out who should be a scheduler as well. The scheduler_file= feature is an example of the first part today and could be an example of the second part in the future, at least on systems with filesystem locks.

mrocklin commented 3 years ago

So, I'm still interested in this topic, however it appears that we went down a classic fail path of ...

I'd like to reconsider doing this in isolation. This would be a simple fix for a while that we might generalize in the future. Thoughts or pushback @jacobtomlinson ?

jacobtomlinson commented 3 years ago

I have been continuing to think about this. This combined with a few other thought processes has lead me down the road of thinking about a Dask Agent.

I was considering experimenting with a contrib project (similar to dask-ctl) which builds an agent which acts as a nanny for all processes. It would be able to handle election and discovery along with other useful things like log exporting and other tasks. The agent would call the subprocesses like dask-scheduler and dask-worker (or dask-spec) directly.

I had started to write up a proposal document for this last week to try and get my thoughts in a row, but was sidelined onto other tasks.

mrocklin commented 3 years ago

Do you object to a similar experiment which adds an ability for current Worker/Nanny/Client objects to query a service for a scheduler address, similar to what we do for scheduler_file?

My sense is that this requires a bit less creativity, and so has a higher chance of early success. It might then evolve into something more sophisticated over time.

On Mon, Jun 14, 2021 at 4:32 AM Jacob Tomlinson @.***> wrote:

I have been continuing to think about this. This combined with a few other thought processes has lead me down the road of thinking about a Dask Agent.

I was considering experimenting with a contrib project (similar to dask-ctl) which builds an agent which acts as a nanny for all processes. It would be able to handle election and discovery along with other useful things like log exporting and other tasks. The agent would call the subprocesses like dask-scheduler and dask-worker (or dask-spec) directly.

I had started to write up a proposal document for this last week to try and get my thoughts in a row, but was sidelined onto other tasks.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4715#issuecomment-860542453, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBGUKWLR6FC5D5PPZDTSXEBVANCNFSM43GC45YQ .

jacobtomlinson commented 3 years ago

My main objection here would be to implementing some custom where-is-my-scheduler.com service with a bespoke API. If Worker/Nanny/Client are to be extended to support scheduler discovery my preference would be to implement support for some standard tooling, like zookeeper or etcd.

My preference would be for this kind of feature to be put into some kind of extended nanny though. Having somewhere outside of distributed to implement scheduler election, scheduler discovery, log exporting, prometheus endpoint aggregation, metric exporting, etc feels like a better separation of concerns.

mrocklin commented 3 years ago

My main objection here would be to implementing some custom where-is-my-scheduler.com service with a bespoke API

What if it was a very very simple API?

like zookeeper or etcd

Standing up something like Zookeeper seems heavyweight to me here.

I view this as similar to the scheduler_file= logic. We did something custom here, yes, but it was super-simple and very pragmatic in the end.

jacobtomlinson commented 3 years ago

What if it was a very very simple API?

My concerns here are:

Standing up something like Zookeeper seems heavyweight to me here.

For users outside of a managed platform like Coiled I imagine this would be as complex as running whatever bespoke solution we put together.

I expect for most users something like docker run -d -p 2181:2181 zookeeper would be sufficient. But even a high availability cluster with three replicas looks pretty straight forward.

I view this as similar to the scheduler_file= logic. We did something custom here, yes, but it was super-simple and very pragmatic in the end.

Scheduler file is great because it is simple. My concern here is that building (and more importantly deploying) a bespoke service discovery crosses the line in terms of simplicity. Even if the API is trivial there is a lot more complexity here than a file on a shared filesystem.