dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 720 forks source link

Client.map() keys as a list breaks Dask queue system #8671

Open arneyjfs opened 6 months ago

arneyjfs commented 6 months ago

Describe the issue: This could not be a bug, but if not then it is unclear to me from the docs why dask behaves like this. When passing a list of values to client.map() to use as keys, the queueing system seems to break.

In the reproducible example:

Minimal Complete Verifiable Example:

from distributed import Client, progress

def simple_task_wait_60s(id: int, logger=None):
    from time import sleep
    sleep(10)
    return 0

if __name__ == '__main__':
    n = 15000
    iterabs = [str(i) for i in range(1000)]

    c = Client(address='tcp://xxx.xxx.xxx.xxx:8786')

    data = c.map(simple_task_wait_60s,
                 iterabs,
                 pure=False,
                 # key=iterabs
                 )
    progress(data)
    results = c.gather(data)

The above produces the following output on the dashboard. Notice the number of jobs queued vs. processing: image

Uncommenting the line key=iterabs produces the following instead. Notice queued=0: image

Anything else we need to know?:

Environment:

fjetter commented 6 months ago

Well, I think this is neither a bug nor a feature. It's a bit of both I would say. Dask is internally relying on the structure of the keys and if you change this structure by providing your own keys, some assumptions are broken. You see this, for instance, in the dashboard not only because the tasks are not queued but also because there is now a progressbar for each task individually. Dask essentially no longer notices that all of those tasks are similar and treats them as individual groups. Internally these are TaskPrefixes and in a nutshell tasks are grouped based on the string prefixes of the keys (keys can also be tuples which is why this definition is a little more complex in reality)

if you do something like iterabs = ["foo-" + str(i) for i in range(1000)] instead, it will just work.

The task queuing unfortunately relies on task groups which is why this isn't working.

arneyjfs commented 6 months ago

This is a great bit of information, thank you! I like your solution to use iterabs = ["foo-" + str(i) for i in range(1000)], but doesn't this also mean that if users are submitting jobs separately, then unless they all choose the same prefix, their tasks will be queued separately?

I.e. if i have 1 worker with 200 cores

Then the worker will still be trying to process 200 from each queue and will be overloaded?

What I was hoping for was the 2 sets of tasks to have different priorities, and be queued together, then all the highest priority tasks would be done first (from whichever set)

fjetter commented 6 months ago

Queuing is not exactly what you are imagining I think. The queuing refers to when we are submitting a task to a worker and not when that work is actually being executed. To impact the order of execution, there is the priority keyword. If no priority is provided we are processing tasks in the order in which they were submitted. We consider tasks that are submitted within a short time window of equal priority, see fifo_timeout. This would also happen if two clients would submit things simultaneously

arneyjfs commented 6 months ago

Ah I see I think I have misundertood that then. This part of the docs makes it sound like the job will stay on the scheduler until there is space on a worker and then be assigned to that worker. In which case having one queue and making sure a worker doesn't pick up more work than it can handle would be optimal.

From what you are saying though, it sounds like the work is immediately scheduled to the least busy worker, even if there is technically no available resource on that worker. Is that right? If so is there any workaround to make it behave like the above? Essentially I need a prioritised queue, where in/out order doesn't really matter but priority does, and crucially, a higher priority task should be picked up regardless of the time of submission or if it has a different TaskPrefix.

If no way to do that, then would ensuring all tasks have the same prefix achieve this behaviour?

fjetter commented 6 months ago

From what you are saying though, it sounds like the work is immediately scheduled to the least busy worker, even if there is technically no available resource on that worker. Is that right?

That is the default behaviour, yes. For most applications this should not concern you. This entire queuing thing was implemented to manage memory pressure for some very specific array workloads. In most situations a user will not care about this. There is also some logic that rebalances tasks between workers if some have too many tasks assigned while otehrs idle.

Essentially I need a prioritised queue, where in/out order doesn't really matter but priority does, and crucially, a higher priority task should be picked up regardless of the time of submission or if it has a different TaskPrefix.

Forget all about "task queuing". "Task queuing" is an internal mechanism that users should rarely bother with and this is not what you are looking for. You are looking for the priority keyword in Client.submit, Client.map, etc. and you can even use a ctx manager to set priorities using annotations, see https://docs.dask.org/en/stable/api.html#dask.annotate

arneyjfs commented 6 months ago

There is also some logic that rebalances tasks between workers if some have too many tasks assigned while otehrs idle.

I think this was a key piece of information, thank you. So even if a worker has 'picked up' a task, it essentially does not mean it will be the one to run it (it can still be rebalanced). It also does not mean it will be run imminently (it can still be affected by other higher priority tasks).

This is very useful clarification, I appreciate your time to explain it