dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

Memory prioritization on workers #5250

Open mrocklin opened 3 years ago

mrocklin commented 3 years ago

(Some context of this is in https://github.com/dask/distributed/issues/2602)

Summary

Workers should start taking memory generation into local scheduling policies. This affects both task prioritization, and pausing.

Background on Prioritization

Currently task prioritization is based on four factors, in order

  1. User-defined priority, such as defined by the with dask.annotate(priority=...) context manager
  2. Client submission time (first in, first out) for different computations (scheduler generation)
  3. Graph structure, using dask.order
  4. The time at which a task appeared on the worker (worker generation)

In common practice, 1 and 2 don't fluctuate much (users rarely use priorities and most comparative decisions are between tasks in the same .compute call, and so 3/graph structure is the most prominent. 4 is only used today when the user makes lots of submit calls without graph structure.

However sometimes we learn more about the computation at runtime. One critical piece of information that we learn is the size and type of each result. This can tell us useful information like "read_parquet" tasks tend to allocate more memory than they free, which, assuming that we're in a memory constrained environment (common) can be more important than graph structure.

Graph ordering is usually great. Some times it isn't. Mostly this is because of information about memory. When should we override graph ordering prioritization?

Background on pausing

Workers sometimes choose to pause execution. This mostly happens when they are out of memory, and hope that some other process in the cluster is going to save them. This might be a few different causes:

  1. The cluster is planning to scale up
  2. Other workers are collecting data from this worker to free it pu
  3. Other tasks are running which will allow this worker to free some of its own tasks

But oftentimes pausing is also bad, and results in a deadlocked cluster. Sometimes we actually need this worker to run certain tasks so that we can get ourselves out of a jam. In these cases we don't so much want to pause work, as we want to pause all work that will lead to more medium-term memory use.

Background: current data collection

We currently track memory use on a per-TaskState basis on both the scheduler and worker (both have a TaskState class) and on a per TaskGroup basis on the scheduler.

Some decisions we have to make

So, we should probably look at storing more aggregate information (aggregations are good for generalizing and making scheduling decisions). We should also look both at the use of any particular group/prefix but also the consumption of any particular group/prefix.

Scheduler/Worker

Additionally, we also need to determine if we want to make this decision on the scheduler or worker side.

Pro-scheduler:

But then the information is farther away from where the decision is implemented. This can be hard for two reasons:

Pro-worker:

When do we override graph structure priorities?

Do we want to always do this? Only when we find tasks that are particularly memory consuming/producing? Only when we're low on memory?

Proposal: prioritization

To start things off, I'll suggest a relatively simple approach.

We add a worker.py:TaskPrefix class that looks like the following:

class TaskPrefix:
    def __init__(self, key):
        self.key = key
        self.bytes_consumed = 0
        self.bytes_produced = 0
        self.n_processed = 0

It tracks total bytes of all inputs and total bytes of all outputs. Then, when computing worker priorities we do the following:

def add_task(self, key, ..., priority=...):
    ...
         tp = self.prefixes[key_split(key)]

        user, scheduler_generation, graph = priority
        if tp.bytes_produced < 5 * tp.bytes_consumed:
            memory = -1
        elif tp.bytes_produced > 5 * tp.bytes_consumed:
            memory = 1
        priority = (user, scheduler_generation, memory, graph, self.generation)

If the prefix associated to a new task produces/consumes five times as much data as it consumes/produces then we override the graph prioritity. Otherwise this level of the prioritization stays at zero, and we defer to subsequent prioritizations. This strikes a balance between believe in graph prioritization and taking control with memory prioritization. Most of the cases where this is an issue production is high and consumption is zero (like read_parquet) or consumption is high and production is orders of magnitude lower (like x.sum())

Unfortunately for rootish tasks we'll have lots of tasks assigned at once before we know their memory priority. To resolve this we could check the priority as we pop things off of the ready heap. If the priority coming off of the heap is obviously stale then we could re-prioritize the entire heap. This would happen infrequently and is pretty cheap to do anyway, assuming we have less than millions of tasks per worker.

Proposal: pausing

Currently we pause with a periodic callback once memory use gets to something like 80%. I propose that we also pause when pulling a task off of the ready stack if that task is memory producing and our memory is something like 50%. This should be more responsive. We'll want to unpause immediately if we get a non-memory-producing task.

mrocklin commented 3 years ago

I like the proposal above for a couple of reasons:

  1. I think that it is fairly safe. In the common case it doesn't change behavior. It only takes actions in some extreme (but real) situations
  2. It's fairly easy to implement as a first pass on this idea
mrocklin commented 3 years ago

First approach: https://github.com/dask/distributed/pull/5251

This only does prioritization, and doesn't yet handle pausing (I anticipate that pausing will take more time)

mrocklin commented 3 years ago

OK, I've added pausing, although it's ugly and not very future-reader friendly. This will have to be cleaned up, but it is probably simple enough for people to take a look at if they're interested.

gjoseph92 commented 3 years ago

Reprioritizing tasks based on learned memory use seems like a great idea. That's information dask.order can't ever know, but we can find out at runtime and adjust accordingly. I'm a little more hesitant about pausing.

Overall, I like the idea of being proactive and avoiding memory over-use in the first place, rather than only being reactive and spilling, pausing, restarting, rebalancing, etc. once we've already used too much memory.

Graph ordering is usually great. Some times it isn't. Mostly this is because of information about memory. When should we override graph ordering prioritization?

When we see workers producing too much memory, it's usually not because of problems in graph ordering. (I say this based on having looked at ordering in https://github.com/dask/distributed/issues/2602, https://github.com/dask/distributed/pull/4864, shuffle-service tasks, etc.) dask.order already does a good job prioritizing tasks to minimize memory footprint. The problem is that we don't actually follow this ordering: https://github.com/dask/distributed/issues/3974, https://github.com/dask/distributed/issues/5223. The ordering tells us we should go more depth-first, but because of how scheduling currently works, we go too breadth-first and run too many memory-producing root tasks, which allows us to keep too many downstream tasks in memory too, making the graph execution "wider" than it needs to be.

Pausing these memory-producing tasks is a nice direct way to address the symptom of this scheduling problem. But it does feel a little odd that we would start with a good ordering, then try to head in the wrong direction, then add this other mechanism to push us back towards the ordering we already knew we wanted to begin with.

That said, a mechanism for workers to avoid doing things that are likely to produce a lot of memory seems like a useful thing to have in general.

Because https://github.com/dask/distributed/pull/5251 involves pausing, I think we should be careful to consider how it could cause deadlocks. There are certainly some obvious cases (like where every task is memory-producing) that would have previously OOMd which would now deadlock, but I'm particularly interested in cases where the computation would have worked if we'd taken the dask.order path perfectly, but would deadlock with this approach.

gjoseph92 commented 3 years ago

As a basic no-tasks-release-memory example, this works on main but deadlocks on https://github.com/dask/distributed/pull/5251. We're trying to persist an array that fits in memory, but is larger than the 0.5 * memory_limit pause threshold. (Obviously that threshold can change; the point is this situation can occur in general.)

Workers go over 50% memory, but the only tasks available to run are more memory-producing root tasks, so they pause, hoping the scheduler will give them something else to do. Since there aren't any memory-freeing tasks in existence, they'll be paused forever.

In [1]: import distributed

In [2]: import dask.array as da

In [3]: from dask.utils import parse_bytes

In [4]: client = distributed.Client(n_workers=4, memory_limit="1gib", memory_spill_fraction=None, memory_target_fraction=None, memory_pause_fraction=None)

In [5]: a1 = da.random.random(parse_bytes("1.5gib") // 8, chunks='1mib')  # small tasks so it takes longer than `memory_monitor_interval`

In [6]: a2 = da.random.random(parse_bytes("1.5gib") // 8, chunks='1mib')

In [7]: a = da.stack([a1, a2])

In [8]: p = client.persist(a)

In [9]: _ = distributed.wait(p, timeout=4)
distributed.worker - WARNING - Worker is at 60% memory usage. Resuming worker. Process memory: 620.96 MiB -- Worker memory limit: 1.00 GiB
distributed.worker - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 598.85 MiB -- Worker memory limit: 1.00 GiB
...
TimeoutError: 

This is contrived, in that if the array were a little larger, the behavior on main would be bad too (workers would just keep OOMing and restarting until something broke). The ideal behavior would probably be for dask to raise an error saying "Your computation is going to take more memory than you have in the cluster".

But it's interesting to think about how we could discover that error from the worker within this memory-prioritization logic, since that's equivalent to detecting when we're in a deadlock versus when we're reasonably paused waiting for the scheduler to give us a better task to run in the near future.

As things stand right now I don't think it's possible. The worker would have to know something more about the global task graph (do any other tasks exist? do any dependencies of my tasks exist? which ones?), which currently it only learns through being given new tasks. It could:

fjetter commented 3 years ago

pausing

I would propose to exclude the pausing mechanism entirely for the moment. IMHO pausing is broken and we should take a step back and redesign it before we pile logic on top of it. Right now it is simple enough that we can throw it out and replace with something more robust. If we now include complex logic this might not be as easy any more afterwards. From the top of my head, what's wrong with pausing

speculative task assignment (STA)

Once I finish https://github.com/dask/distributed/pull/5046 it should be relatively easy to incorporate STA on worker side. At least we should be able to start off a prototype. I'm wondering how urgent this memory-producing prioritization would be if we had STA.

I'm a bit on the fence regarding the memory producing/consuming reprioritization suggested here. I am worried about the imprecise, local knowledge on worker side especially since the memory producer/consumer toggle would entirely overwrite any topological ordering. I can also see situations which would let us become biased towards already learned tasks even though a topologically better one would be available but we haven't learned it, yet. That would further skew us towards breadth-first execution.

I realize that if our memory producer/consumer heuristic is good enough, the skewed and biased decision might still be a good enought decision. Toughts?

Heap reprioritizing

With the proposed TaskPrefix we could implement a heuristic to trigger a heap reprioritization once a taskprefix memory-producer/consumer toggle flips. That should not happen frequently, should it?

mrocklin commented 3 years ago

This is contrived, in that if the array were a little larger, the behavior on main would be bad too (workers would just keep OOMing and restarting until something broke). The ideal behavior would probably be for dask to raise an error saying "Your computation is going to take more memory than you have in the cluster".

Thank you for the example, this is helpful. In this case the worker would actually write to disk. This is usually not great, but still probably better than just pausing everything.

Once I finish #5046 it should be relatively easy to incorporate STA on worker side. At least we should be able to start off a prototype. I'm wondering how urgent this memory-producing prioritization would be if we had STA.

If speculative task assignment is easy then great, that would definitely solve our short-term pain here. That being said, I would be surprised to learn that STA is cheap, and even more surprised to learn that it remained cheap after we got into the topic. Maybe the state machine rewrite changes things though. I am curious to learn more. Do you have a time estimate on this?

I'm a bit on the fence regarding the memory producing/consuming reprioritization suggested here. I am worried about the imprecise, local knowledge on worker side especially since the memory producer/consumer toggle would entirely overwrite any topological ordering.

I wouldn't worry too much about imprecise and local knowledge. We can also implement this on the scheduler if we want it there. This only overrides topological ordering in significant cases. There are also good reasons to override graph ordering from time to time. Graph ordering doesn't have this information.

mrocklin commented 3 years ago

There have been a couple of situations where we've wanted pausing for memory producing tasks.

  1. The situation we've just recently run into with shuffling, where we're waiting on the worker-scheduler update cycle
  2. The original memory backpressure issue, where we were running lots of memory-consuming tasks while waiting on communication. (we resolved the short-term pain of that issue by reducing the need for lots of that communication, but the general theme still applies).

In both cases we don't actually want to pause the worker, we want to pause the worker for a brief interval, so that the rest of the system can catch up. If we think that STA will take a while, and that we want something else in the meantime, then "pause for a bit" might be an interesting compromise. In labor terms (yesterday was labor day in the US) this is a bit like a work slowdown rather than a general strike.

fjetter commented 3 years ago

Maybe the state machine rewrite changes things though. I am curious to learn more. Do you have a time estimate on this?

I do not have a good handle on how big the changes on scheduler side would need to be to enable this which makes a good estimation currently difficult. If we restrict the speculative tasks to a subclass, i.e. tasks must only be assigned speculatively if all its dependencies are either in memory or computed on the designated worker, I would expect workers in #5046 to be already capable of dealing with STA. The speculative tasks would then be simply "waiting" (as depicted in https://github.com/dask/distributed/pull/5046#issuecomment-895305582) but the worker should work fine regardless and transition the tasks automatically.

I suspect bigger changes are necessary on scheduler side but I haven't spent any time or thought on this, yet. Disabling work stealing, relaxing a few sanity checks and abusing the processing state might get us up and running in a few days with a prototype. Stable operation would take much longer since there are many edge cases we'd need to account for. If we need this fully operational, this will take easily another month (after all, I need to finish the refactoring first)

gjoseph92 commented 3 years ago

In both cases we don't actually want to pause the worker, we want to pause the worker for a brief interval, so that the rest of the system can catch up. If we think that STA will take a while, and that we want something else in the meantime, then "pause for a bit" might be an interesting compromise.

This could work. I think focusing more narrowly on exactly what we want to solve (root task overproduction) like this is a good approach. This might not even have to use pausing per se; it could be a specific condition that ensure_computing could recognize to just pass on the current cycle (discussion in https://github.com/dask/distributed/issues/5223).

Disabling work stealing, relaxing a few sanity checks and abusing the processing state might get us up and running in a few days with a prototype

I'm inclined to try this before the "pause for a bit" option. We could learn a lot from hacking a prototype in a few days. It would be nice understand the shape of the STA problem before we look for a quicker way around it. Depending on how things shake out, I could see "pause for a bit" being useful even with STA, or just an unnecessary temporary fix.

gjoseph92 commented 3 years ago

Also should note that the implementation in https://github.com/dask/distributed/pull/5251 doesn't work that reliably on root tasks either. Possibly because it relies on the memory monitor's RSS, which only updates every 500ms by default, it's easy for root tasks to over-produce faster than we check for memory?

On my machine this just runs out of memory without seeming to try to pause, just like it would on main:

In [1]: import distributed

In [2]: import dask.array as da

In [3]: from dask.utils import parse_bytes

In [4]: client = distributed.Client(n_workers=4, memory_limit="1gib", memory_spill_fraction=None, memory_target_fraction=None, memory_pause_fraction=None)

In [5]: a = da.random.random(parse_bytes("6gib") // 8)

In [6]: p = client.persist(a)

I've noticed this acutely with the shuffle service. Every time I start a job, I have a nail-biting experience watching workers quickly fill up with managed memory, wondering if they'll stop in time or if they'll run out of memory and I'll have to wait 5min to recreate the cluster. For whatever reason, tonight I haven't been able to get a single shuffle to not run out of memory when using TCP comms on small clusters (works fine with TLS).

So even if we went with "pause for a bit", we'd need a different implementation that can determine when to pause effectively instantly. I guess we could look at the amount of data currently stored (though it's an underestimate for RSS).

mrocklin commented 3 years ago

It may be reasonable to update RSS after every task is computed.

mrocklin commented 3 years ago

Disabling work stealing, relaxing a few sanity checks and abusing the processing state might get us up and running in a few days with a prototype

I'm inclined to try this before the "pause for a bit" option. We could learn a lot from hacking a prototype in a few days. It would be nice understand the shape of the STA problem before we look for a quicker way around it.

@fjetter what is your backlog like these days. How long will it be before you have a few days to work on this? I think that we should prioritize clearing out some of your pending work instead of adding to it.

fjetter commented 3 years ago

what is your backlog like these days. How long will it be before you have a few days to work on this?

  1. https://github.com/dask/distributed/pull/5046 is my top prio right now. that's mostly a rather messy merge of main
  2. https://github.com/dask/distributed/pull/5217 / https://github.com/dask/distributed/issues/5184 would be next in line to allow forwarding of exceptions. I added a comment to #5184 with an idea for a less invasive, tornado preserving solution which uses stdlib log handlers which should be doable in an afternoon 🤔

All other side projects of mine are rather low prio and I would be able to pick this up by mid next week, I suppose

jakirkham commented 3 years ago

I've noticed this acutely with the shuffle service. Every time I start a job, I have a nail-biting experience watching workers quickly fill up with managed memory, wondering if they'll stop in time or if they'll run out of memory and I'll have to wait 5min to recreate the cluster. For whatever reason, tonight I haven't been able to get a single shuffle to not run out of memory when using TCP comms on small clusters (works fine with TLS).

Did this include PR ( https://github.com/dask/distributed/pull/5208 )? Wondering if it is as simple as we are doing a bunch of copies that eventually cause things to fall over. Or would that still run into issues for other reasons?

gjoseph92 commented 3 years ago

It did include https://github.com/dask/distributed/pull/5208. I think there's a different issue I need to debug. But it was interesting that TCP seemed to make memory worse.

fjetter commented 3 years ago

For reference, there has been an earlier ticket which, I believe, covers the same problem https://github.com/dask/distributed/issues/4891