dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Restrict task/actor placement based on free worker memory #2843

Open dericdesta opened 5 years ago

dericdesta commented 5 years ago

First off, thank you all for this great framework!

I ran into the following issue: I want to distribute actors which consume a significant amount of RAM. For instance, each worker holds a large NumPy array ~ 1GB in memory, thus considering available worker memory is crucial for actor placement. Furthermore, let's assume the following workers:

Hence, worker 1 can hold a maximum of 1 actor; worker 2 can hold a maximum of 3 actors (ignoring any spill, pause, terminate for now).

Unfortunately, resources={...} are only considered while processing a task or actor. Therefore, submitting 4 actors results in the following invalid placement:

As a result, worker 1 will die due to memory-limit violation. So I asked myself if there is any possibility to take available worker memory into account when assigning an actor to a worker. If there is no solution yet, I'd love to open a PR.

Thanks!

mrocklin commented 5 years ago

The proposed semantic change, that Actors wouldn't release resources for as long as they're in memory, seems reasonable to me.

We'll have to change this in both the scheduler and worker. In both cases we'll need to not release resources as normal if the task is an actor (see ts.actor on the scheduler and key in self.actors on the worker), and we'll need to instead release resources when the task is released from memory.

https://github.com/dask/distributed/blob/e5ec8daab0d2b30702cfce4acb6259c10aef8e05/distributed/scheduler.py#L3443

We would probably want to release the resources when the task goes from memory to released here instead

https://github.com/dask/distributed/blob/e5ec8daab0d2b30702cfce4acb6259c10aef8e05/distributed/scheduler.py#L3803

We'll have to handle this in the worker as well, which tracks resources separately

https://github.com/dask/distributed/blob/e5ec8daab0d2b30702cfce4acb6259c10aef8e05/distributed/worker.py#L1550-L1552

https://github.com/dask/distributed/blob/e5ec8daab0d2b30702cfce4acb6259c10aef8e05/distributed/worker.py#L1189

dericdesta commented 5 years ago

Thanks for your prompt response and those helpful code fragments above. I've added respective checks if this is an actor's task state. Looking good so far. But one issue is left, I think:

Right before submitting the actors, valid workers are fetched via https://github.com/dask/distributed/blob/e5ec8daab0d2b30702cfce4acb6259c10aef8e05/distributed/scheduler.py#L4398-L4445 Here, self.resources[resource].items() are not the currently availabe but maximum available resources (called supplied here). That's why actors will be evenly distributed since supplied >= required holds true for all workers regardless of the resources actually available. So sticking to the initial example let's set some worker resources:

Submitting 4 actors with resources={'n_actors': 1} will distribute them evently among worker 1 and worker 2 (2 and 2 actors). As actor resources are not freed (until an actor is removed from memory) after the changes you suggested, some actors (here: one actor on worker 1) will hang forever in pending state as no resources are/become available.

Correct me if I'm wrong, but I think that the maximum available resources are obtained once by the scheduler via self.add_resources(worker=address, resources=resources) https://github.com/dask/distributed/blob/e5ec8daab0d2b30702cfce4acb6259c10aef8e05/distributed/scheduler.py#L1341-L1395 and the currently available resources are not tracked by the scheduler at all. In order to submit tasks/actors according to currently available worker resources we need to pass them from the workers to the scheduler.

mrocklin commented 5 years ago

Ah indeed, you have a good point. I don't have an obvious solution here. Maybe workers assume that actors will be around for a while, and inform the scheduler about their changed resources?

https://github.com/dask/distributed/blob/27e8e6548e7a8401c9199a55c7dea2fa7331cb04/distributed/worker.py#L1208-L1219

In this situation they would also check the actors that they have currently available to run, and if they don't fit then they would send them back to the scheduler for re-scheduling?

dericdesta commented 5 years ago

This sounds like a decent approach. Let me try to implement this.

dericdesta commented 5 years ago

I think the easiest solution would be to include the available worker resources in the worker's heartbeat. Combined with re-scheduling (you mentioned earlier) it's a wrap. What do you think? After digging into this, I think we should rename resource related variables as naming seems to be quite inconsistent to me.

WorkerState Worker
resources total_resources
used_resources available_resources
mrocklin commented 5 years ago

That would work. The problem with the worker heartbeat is that it can be somewhat infrequent, up to several seconds if there are a lot of workers around. In some scheduling situations that's a long time. I'm more inclined to tell the scheduler immediately on changed resource availability if that's alright.

Of course, this requires whoever does this work to be comfortable with sending messages back and forth. Do you understand how you can pass messages back and forth between the scheduler and workers?

Function calls like the following

https://github.com/dask/distributed/blob/e02cc4409e352e40e4128fc542b8aaed51b5a01f/distributed/worker.py#L1247-L1249

Are looked up in this dictionary of method names

https://github.com/dask/distributed/blob/e02cc4409e352e40e4128fc542b8aaed51b5a01f/distributed/scheduler.py#L1070

And then called on the scheduler side

https://github.com/dask/distributed/blob/e02cc4409e352e40e4128fc542b8aaed51b5a01f/distributed/scheduler.py#L4483-L4491

dericdesta commented 5 years ago

Yeah you are right, just found the heartbeat interval function. Well, then immediate notification is probably the cleanest solution. I think I understand how to pass messages. Otherwise I'll ping you here, if this is OK.

mrocklin commented 5 years ago

Sounds good

On Mon, Aug 5, 2019 at 2:53 PM Frederic Madesta notifications@github.com wrote:

Yeah you are right, just found the heartbeat interval function. Well, then immediate notification is probably the cleanest solution. I think I understand how to pass messages. Otherwise I'll ping you here, if this is OK.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2843?email_source=notifications&email_token=AACKZTC5WTTGDRLMMJO3VFTQDCOOBA5CNFSM4ID3EZY2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD3TGGZY#issuecomment-518415207, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTCMD2NIE5LCW3ZQJRDQDCOOBANCNFSM4ID3EZYQ .