dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

[DRAFT] Move adaptive scaling functionality into dedicated process #6907

Open hendrikmakait opened 2 years ago

hendrikmakait commented 2 years ago

Problem With adaptive scaling being executed on the client-side, it's easy to execute multiple (possibly mutually-exclusive) adaptive scaling policies on a single distributed cluster through the use of multiple Cluster objects.

Goal At any given moment, there should be one, and only one, adaptive scaling policy in place that is being applied to the distributed cluster (not Cluster object).

Additional benefits:

fjetter commented 2 years ago

Just to be very precise, adaptivity is not implemented in the distributed.Client but as part of distributed.Cluster. That's of course still "client side" and indeed Cluster.adapt spawns multiple Adaptive instances. I agree that should be fixed.

Regarding the issue title, I am very skeptical that we can or want to put this kind of functionality into the Scheduler. The scheduler does not have any means of starting a worker. It's a valid request to ensure somehow that there is only one driver out there but the scheduler is not the correct target for this. Deployment and resource management is done using the Cluster object.

@jacobtomlinson has been looking into a couple of cases where there are potentially multiple cluster instances around for the same compute cluster and might have an opinion about this.

jacobtomlinson commented 2 years ago

Having the adaptive code be client side has always been slightly awkward so I'm definitely in favour of moving it somehow. But in many cluster managers the ability to create workers can only be done from the client side as the scheduler has no credentials to create VMs/containers/etc. This is why the architecture is the way it is.

It could be interesting to move to a model where a singleton process handles this somewhere adjacent to the scheduler, but it would be a big change and require some specific logic to be implemented in each of the downstream projects.

In dask-kubernetes we are moving to having the operator controller handle this, so we will bypass Adaptive altogether. Calling cluster.adapt() will create a DaskAutoscaler resource and the controller will see that and periodically communicate with the scheduler to get the numbers to scale up/down to. However I know some folks like to implement custom instances of Adaptive and we won't support that, so we either need to find a way to support that later or rethink allowing custom adaptive behaviour like that.

I'm very much up for changing this, but I want to communicate that this is a very large effort touching many projects.

hendrikmakait commented 2 years ago

Thanks for the feedback, I admit moving the functionality into the Scheduler was a misguided shortcut for moving it into some dedicated entity with the authority to make scaling decisions on the cluster. I've marked the ticket as a draft to flesh out a more thought-out solution. I think what I would like to have in the end is some singleton process that's effectively acting as a cluster manager with which the Cluster objects would talk, reducing them to more of a ClusterState object.

hendrikmakait commented 2 years ago

As a side-note, would it make sense to add a draft label?

jacobtomlinson commented 2 years ago

Yeah I think the core of the issue is that there can be more than one FooCluster object representing a cluster at any given moment, and if two of them call cluster.adapt() then they could in theory be competing to scale up and down. A lock that is held by the scheduler would be a quick and dirty solution to that.

It's interesting that @hendrikmakait talks about the Client a lot in the first issue, but as @fjetter says this logic lives in Cluster. But if it was moved to somewhere central and near the scheduler this could be moved to the Client which might make it more accessible to folks.

Moving the adaptive logic to a single location would be great, which is what we've done in dask-kubernetes by moving this logic into the controller. But the Kubernetes operator pattern makes this pretty trivial as the controller already has credentials to add/remove workers and communicate with the schedulers, other cluster managers will not be as straightforward.

I think dask-gateway also handles this centrally, but I'm not sure about the details of how that works. Maybe @jcrist, @martindurant, @douglasdavis would know more.

It would be interesting to know how the Coiled platform handles this. My assumption is that it is similar to the dask-kubernetes approach of having a central autoscaler that is managed by Coiled. This follows the assumption that Coiled doesn't actually use Adaptive in the same way dask-kubernetes doesn't.

For libraries like dask-jobqueue, dask-cloudprovider, etc it is less clear where this should go.

In jobqueue it is common for the scheduler to also be colocated with the FooCluster and today there is no way of having multiple cluster managers represent one cluster, so it's a non-issue there.

In cloudprovider we are limited by the cloud platforms. None of them have the same kind of cluster-wide controller concept that Kubernetes does, and there is no central SaaS API like Coiled has, so there is no central place to put this logic. So colocating it with each scheduler makes more sense, whether it is a separate process or just a thread created by a plugin I have no strong feelings. The main challenge here would be ensuring that the VM/container that holds the scheduler has credentials with the right permissions to spin up these resources. We could solve this by having the Cluster object generate some temporary credentials via STS (or equivalent) and pass those to the scheduler which would need to refresh this token indefinitely while it is running. It would require the scheduler to have some knowledge about how it was deployed and how it can interact with that platform, which isn't true today but could be changed.

I think my point overall is that maybe we should deprecate/remove Adaptive altogether and just maintain the adaptive_target/workers_to_close/retire_workers/etc methods in the scheduler for querying what the scheduler wants to happen. Then adaptive logic should be implemented on a case-by-case basis in each deployment project rather than trying to design some one-size-fits-all solution because they are all so different.

jacobtomlinson commented 2 years ago

As a side-note, would it make sense to add a draft label?

Went with discussion as that seemed more fitting.

hendrikmakait commented 2 years ago

It's interesting that @hendrikmakait talks about the Client a lot in the first issue, but as @fjetter says this logic lives in Cluster.

To clarify, I am saying that the execution of the adaptive scaling logic lives on the client-side, i.e., a user process (or multiple ones). @fjetter is correct in that the called methods are attached to the Cluster class.

  • simplifies the client

I've adjusted that to the client-side.

I think the issue here is that there is some distributed cluster, i.e., a collection of workers and a scheduler, and a Cluster object.