dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Scheduler ignores current resource consumption levels while scheduling #6467

Open gjoseph92 opened 2 years ago

gjoseph92 commented 2 years ago

Currently on the scheduler, when a task is assigned to a worker and consumes resources, that's set in one place. When deciding whether a task can be assigned to a worker, that's checked in a different place. Therefore, current resource consumption levels are not considered in task scheduling.

The current scheduling appears to just consider which workers can run a task in theory: do they have enough of the resource to be able to run this task ever (even if none of it is available right now)?

Considering resources like GPUs, I suppose this makes sense: queuing extra tasks onto workers is beneficial so there's no idleness. Still, it's a little surprising. And the fact that worker_objective doesn't take current resource consumption into account seems likely to cause bad scheduling, since we could easily assign a task to a worker whose resource is currently used up, when there are other workers with the resource available.


When a task gets assigned to a worker, consume_resources only adjusts the count in WorkerState.used_resources: https://github.com/dask/distributed/blob/e0ea5df5bc56b130f66d2741d9bc24bfa1eb0121/distributed/scheduler.py#L2674-L2675

But SchedulerState.valid_workers looks for which workers can run a task, it only checks self.resouces[resource][address], and never looks at WorkerState.used_resources: https://github.com/dask/distributed/blob/e0ea5df5bc56b130f66d2741d9bc24bfa1eb0121/distributed/scheduler.py#L2644-L2652

So tasks will not enter the no-worker state just because all resources in the cluster are currently used up.

Instead, as usual, more tasks will get queued onto workers than they can run at once. Each worker will manage only running the correct number of tasks at once.


Note that changing this behavior would likely provide a viable temporary solution for https://github.com/dask/distributed/issues/6360, a very common pain point for many users.

cc @mrocklin @fjetter

elementace commented 4 months ago

FWIW, I'm running into a problem where XGBoost does some weird thread management per node in my cluster. If the sum of the 'njobs' of all assigned xgboost training tasks to a node is > # of vCPUs, then it uses only 1 vCPU in total for all tasks.

Hence I was looking to worker resource management to solve this problem (by making each require the whole resource of the worker).

The problem then stemming from this ticket is it successfully completes the first task by itself using the full compute capacity (all 32 cores), and then tries to run all the remaining tasks at the same time (without assessing the resources available and queuing 1 at a time accordingly). Resulting in the cpu utilisation dropping to 1 / 32 cores.

Has there been any progress on this in discussions @gjoseph92 ? @mrocklin ?