dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

[Idea] Handle large graphs by spilling tasks to disk on Scheduler #5630

Open gjoseph92 opened 2 years ago

gjoseph92 commented 2 years ago

As clusters and larger compute jobs become more accessible, large task counts become more common. The distributed scheduler currently struggles when task counts get high (xref https://docs.dask.org/en/stable/best-practices.html#avoid-very-large-graphs, https://github.com/dask/distributed/issues/3898, others). Anecdotally, things often feel rough in the 100K range, and may struggle to function beyond 1M tasks. There are likely many reasons for this, and we don't understand them well (one of my personal theories is that the sheer number of Python objects slows down GC https://github.com/dask/distributed/issues/4987), but when there aren't lots of tasks in memory, things usually seem fine—so what if rather than carefully profiling and understanding this problem, we just avoid ever having many tasks in memory?

At any given moment, usually only a small fraction of tasks are actually relevant to scheduling decisions or state updates. The scheduler is designed around O(1) scheduling—we don't look at the whole graph, just the immediate next step. In principle, that should mean we could strategically swap most TaskStates to disk without affecting scheduling performance, since we wouldn't look at most of them right now anyway.

The goal here would be for the number of tasks in memory to only depend on the number of tasks actually running/processable at the moment (closer to number of workers, or nthreads), not the total size of the graph. And to therefore support running nearly-arbitrarily-large graphs (billion tasks anyone?), swapping a small subset of tasks in and out of disk in a rolling fashion. (Obviously, actually doing a billion tasks would require streaming serialization of graphs and other such changes; I'm just talking high-level for now. We could implement this progressively, and it would still add a lot of value at the million-task scale with HLGs but no serialization changes.)

Additionally, moving the primary "database" of TaskStates out of Python memory could open doors for interesting (and tricky) work around scheduler restarting or fault-tolerance.

Also just to note, I'm using "spilling" as a general term—I don't think this should happen transparently though a zict buffer like Worker.data. Scheduler performance is too critical for implicit behavior. The loading/unloading from disk should happen explicitly, so we can reason about and guarantee behavior.

cc @fjetter @crusaderky @jcrist @jakirkham @jrbourbeau

jakirkham commented 2 years ago

cc @madsbk @rjzamora @quasiben

crusaderky commented 2 years ago

I like the general idea, but I think that spilling to disk is unnecessarily expensive. If the target is hiding the objects from the GC, then converting the TaskState objects to pickled blobs in memory would be a lot faster - as long as RAM usage on the scheduler is not an issue (and I doubt it is for most people).

crusaderky commented 2 years ago

...which makes the point emerge: If the tasks are not being accessed by the scheduler to begin with, then that leaves a burden exclusively on the GC. Which means that we can get a good best-case estimate of the benefit of your proposal without touching any code: if you disable the GC, how much faster does a scheduler with 1m+ tasks on it run?

gjoseph92 commented 2 years ago

if you disable the GC, how much faster does a scheduler with 1m+ tasks on it run?

Great question. I'll try to test this at some point. We've experimented with ~100k tasks in https://github.com/dask/distributed/issues/4987#issuecomment-877473670 and seen a speedup, but I haven't tried in the millions. I'm also particularly curious about qualitatively how responsive the dashboard feels. The lagginess of the dashboard is something you can really feel going from the 10k task range to the 100k/1m task range.

I think that spilling to disk is unnecessarily expensive. If the target is hiding the objects from the GC, then converting the TaskState objects to pickled blobs in memory would be a lot faster

I like this idea too; I was trying to think of some way to hide objects from GC in memory, but this is much simpler than anything I came up with. I agree that actual scheduler RAM usage from tasks is usually not the issue, but part of that is probably because it falls over for other reasons if you give it a lot of tasks, so nobody is able to push that RAM limit right now. If these changes made the scheduler easily able to handle millions of tasks, then we might see RAM actually start to become a limiting factor. The good thing is that switching from a model of pickled TaskStates in memory to pickled TaskStates on disk seems like a pretty natural progression.

jcrist commented 2 years ago

Not all objects are tracked by the GC for cycle detection - only objects that are capable of creating reference cycles increase the load on the GC. Builtin objects have the following optimizations to reduce the number of items tracked by the GC:

Also note that the cpython GC is object counter based. Every time a tracked object is created, the counter increases, and every time a tracked object is destroyed the counter decreases. So creating and destroying a few tracked objects in a hot loop won't trigger the GC, but suddenly creating a lot tracked objects (during e.g. deserialization) will.

With c extensions/cython, you can also disable GC tracking for certain objects (if you're sure no reference cycles can ever be created by those objects alone). For example - in msgspec I added an optimization that msgspec structs are untracked at creation time if none of their attributes contain a tracked object. This has a measurable impact on deserialization speed, as it can avoid triggering additional unneeded GC cycles.

So a few points we might take from this:

gjoseph92 commented 2 years ago

Yeah, there's been a bit of discussion about these GC strategies in https://github.com/dask/distributed/issues/4987, especially from @jakirkham. My intent with this issue was to present a totally alternative approach:

However, we also notice:

jcrist commented 2 years ago

Making any of these GC-related tweaks (@cython.no_gc, changing thresholds, tuple/dict optimizations, etc.) probably require understanding what the problem is in order to not be shooting in the dark.

Sure, and I don't want to derail that discussion. But if you already have a benchmark setup, I'd think that annotating TaskState with @cython.no_gc and recompiling would be a lot simpler to benchmark than writing all the code to store excess task states as in memory bytes. If it makes no noticeable difference then I wouldn't keep delving down this rabbit hole, but it might be a nice easy win in this case.

chrisroat commented 2 years ago

FWIW, in the issue mentioned at the start (#3898), the memory of the scheduler is not released when the graph is complete -- even after client restart. So after several large graphs, the memory grows quite large and I see degradation of performance. It's not clear to me if this is task-related, or potentially some I/O buffer.

Here is an example showing the memory growth on the scheduler, which I check by watch at the time series graph of the "System" tab.

import dask
import dask.array as da
import distributed

with dask.config.set({"distributed.scheduler.transition-log-length": 100}):
    client = distributed.Client(threads_per_worker=1)
_ = da.zeros(100_000, chunks=1).map_blocks(lambda x: x).compute()

In my application, I simply monitor the scheduler memory usage. When it gets large (a couple GB), I kill the cluster and then restart.