dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 716 forks source link

Ease memory pressure by deprioritizing root tasks? #6360

Open TomNicholas opened 2 years ago

TomNicholas commented 2 years ago

The need for memory backpressure has been discussed at length, and a full solution is arguably necessary to solve many of the use cases in the pangeo community. Whilst efforts in that direction do seem be underway, I would like to suggest a simpler stop-gap solution for the shorter term. The basic issue is that workers can overeargerly open new data at a rate faster than other tasks free up memory, and is best understood via the figure under "root task overproduction" here.

All my tasks that consume memory are root tasks, so is there any way to deprioritize root tasks? This should be a lot simpler than solving the general problem, because it doesn't require (a) any records kept of how much memory each task uses or (b) workers to change their priorities over the course of a computation based on new information. All I need is for workers to be discouraged / prevented from opening new data until the task chains that are already begun are as complete as possible. This will obviously waste time as workers wait, but I would be happy to accept a considerable slowdown if it means that I definitely won't run out of memory.

This problem applies to any computation where the data is largest when opened and then steadily reduced (which is most pangeo workflows, with the important exception of the groupby stuff IIUC), but the opening task and the memory-relieving tasks can't be directly fused into one.

Is there some way of implementing this? I'm happy to provide an example use case if that would help, or to try hacking away at the distributed code, but I wanted to know if this is even possible first.

cc @gjoseph92 , and @rabernat @dcherian @TomAugspurger , because this is related to the discussion we had in the pangeo community meeting the other week.

gjoseph92 commented 2 years ago

The figure @TomNicholas was referencing (thanks for the reminder, I forgot about this!): image

I mention that because this problem is inherent to the way the current task scheduling algorithm works in distributed; see https://github.com/dask/distributed/issues/5223 and https://github.com/dask/distributed/issues/5555.. So approaches like https://github.com/dask/distributed/issues/4891 are not, in my mind, the "full solution"—they'd be layering on more complexity to an already-complex algorithm with two components (normal eager scheduling, and task stealing aka occasional rebalancing). "Speculative task assignment" (https://github.com/dask/distributed/issues/3974) is more what I'd consider the "full solution", thought I think there are other approaches that could work as well.

Just for context :)

gjoseph92 commented 2 years ago

so is there any way to deprioritize root tasks?

As a way to hack around this, right now, I'd play with worker resources. For example, something along the lines of (untested):

import dask
import coiled
import xarray as xr

NTHREADS = 4
cluster = coiled.Cluster(..., worker_cpu=NTHREADS, worker_options={"resources": {"ROOT": NTHREADS}})
client = dask.distributed.client(cluster)

with dask.annotate(resources={"ROOT": 1}):
    ds = xr.open_dataset(...)

result = ds.a - ds.a.mean()
with dask.config.set({"optimization.fuse.active": False}):
    result.compute()

A few caveats:

dcherian commented 2 years ago

One way to "solve" this is to batch the computation (e.g. compute the result 100 timesteps at a time) and write to zarr (some ugly code below).

``` python def batch_load(obj, factor=2): """ Load xarray object values by calling compute on block subsets (that are an integral multiple of chunks along each chunked dimension) Parameters ---------- obj: xarray object factor: int multiple of chunksize to load at a single time. Passed on to split_blocks """ if isinstance(obj, xr.DataArray): dataset = obj._to_temp_dataset() else: dataset = obj # result = xr.full_like(obj, np.nan).load() computed = [] for label, chunk in split_blocks(dataset, factor=factor): print(f"computing {label}") computed.append(chunk.compute()) result = xr.combine_by_coords(computed) if isinstance(obj, xr.DataArray): result = obj._from_temp_dataset(result) return result def batch_to_zarr(ds, file, dim, batch_size, restart=False, **kwargs): """ Batched writing of dask arrays to zarr files. Parameters ---------- ds : xarray.Dataset Dataset to write. file : str filename dim : str Dimension along which to split dataset and append. Passed to `to_zarr` as `append_dim` batch_size : int Size of a single batch Returns ------- None """ import tqdm if not restart: ds.isel({dim: [0]}).to_zarr(file, consolidated=True, mode="w", **kwargs) else: print("Restarting...") opened = xr.open_zarr(file, consolidated=True) ds = ds.sel(time=slice(opened[dim][-1], None)) print( f"Last index = {opened[dim][-1].values}. Starting from {ds[dim][1].values}" ) opened.close() for t in tqdm.tqdm(range(1, ds.sizes[dim], batch_size)): if "encoding" in kwargs: del kwargs["encoding"] ds.isel({dim: slice(t, t + batch_size)}).to_zarr( file, consolidated=True, mode="a", append_dim=dim, **kwargs ) ```

Effectively this prevents the scheduler from ever seeing "too many" root tasks.

This now reminds me of the rechunker approach image

where the dummy task in the middle prevents scheduler from getting too far ahead, and consequently limits memory usage.

So perhaps there can be a new function in graph_manipulation.py that takes as input an integer N and a task token token. This should make the N+1:2Nth tasks execute only after the Nth task has completed executing.

TomNicholas commented 2 years ago

Thanks both for your replies.

@dcherian batching like you suggested is one of my backup plans :sweat_smile:, but thank you for the code example!

@gjoseph92 I tried playing with worker resources as you suggested, but if it made any difference to the behaviour I couldn't tell.

Without setting a limit on resources my cluster starts failing like this:

Screenshot from 2022-05-26 14-37-46

and after I set a limit via

options = g.cluster_options()
NTHREADS = 1
options.worker_cores = NTHREADS
options.worker_memory = 50

with dask.config.set({"resources": {"ROOT": NTHREADS}}):
    gc = g.new_cluster(cluster_options=options)

with dask.annotate(resources={"ROOT": 1}):
    ds = xr.open_dataset(...)

omega = vort(ds)  # my calculation, an xarray.apply_ufunc

with dask.config.set({"optimization.fuse.active": False}):
    with gc.get_client():
        # write result to zarr
        st = omega.to_dataset(name="vort").to_zarr(mapper, mode='w', compute=True)

it looks like this

Screenshot from 2022-05-26 16-33-31

You can see that in both cases it has managed to save out ~300 chunks (store_map), but the data spilled to disk still keeps steadily growing.


At @jbusecke's suggestion I also tried setting the task priority via

with dask.annotate(priority=-999999):
    ds = xr.open_dataset(...)

with dask.annotate(priority=999999):
    with gc.get_client():
        # write result to zarr
        st = omega.to_dataset(name="vort").to_zarr(mapper, mode='w', compute=True)

which appeared to help the computation proceed a bit somewhat, but after a while my memory usage still keeps growing indefinitely.


This is a subset of my optimised (partially fused) task graph, for two timesteps. It's basically two open_dataset calls per timestep, each of which is padded with xarray, then fed into a simple apply_ufunc which returns one chunk, then written to a single zarr chunk.

image

At full scale I'm trying to compute 9000 timesteps. The difference is that the outer-most 4 task chains (2 on each side) are duplicated (there are 18000 open's for the full problem), but the middle 3 chains don't change (those are grid-related variables that don't depend on time), so it's basically 2N+3 open_dataset calls for N time chunks.

The static ordering looks reasonable - dark orange should be written out before yellow and light orange are loaded

image

But this static ordering seems to be largely irrelevant to what actually happens when you have multiple workers.


Is this a pathological fail case? Whatever I've tried my 20 workers still race to open ~400 chunks (loading hundreds of GBs into memory and spilling to disk) before they manage write a single chunk out, even though each output chunk only requires two input chunks.

There might be some way for me to inline/fuse my way to an embarrassingly parallel graph, so that the open and write steps are in one task, but at that point I may as well have just used map_blocks (or a different parallel execution framework).

gjoseph92 commented 2 years ago

But this static ordering seems to be largely irrelevant to what actually happens when you have multiple workers

Yes, the priority ordering doesn't have that much effect (it's just a tie-breaker, not an actual ordering that the scheduler follows).

if it made any difference to the behaviour I couldn't tell

I'm a bit surprised by this (but I agree, it didn't look like anything changed).

I looked into it a little and it turns out the scheduling of tasks that use resources doesn't work the way I expected:

https://github.com/dask/distributed/pull/6468 makes it work the way you'd expect. @TomNicholas, I'd be really curious to see how things work if you keep using resources, but switch to that PR. There's been lots of discussion of root-task overproduction, but we've never actually gotten to see how things would go if it didn't happen, so having this comparison would be very valuable.

TomNicholas commented 2 years ago

Thanks so much for that @gjoseph92 - I'm excited to try it out, but it'll take me a few days as I need to work out how to run on a custom version of distributed first.

gjoseph92 commented 2 years ago

You could use Coiled if that would be easier; you can install git+https://github.com/gjoseph92/distributed.git@hold-back-tasks-without-available-resources in your software environment.

TomNicholas commented 2 years ago

@gjoseph92 @jbusecke and I are trying to test this but our coiled cluster won't actually start :/

This is the environment file we're using, which we tried both locally and in the pangeo cloud

name: hero_calculation
channels:
  - conda-forge
dependencies:
  - ipykernel
  - coiled
  - xgcm
  - numpy
  - zarr
  - fsspec
  - gcsfs
  - pip
  - pip:
    - git+https://github.com/pydata/xarray.git
    - git+https://github.com/gjoseph92/distributed.git@hold-back-tasks-without-available-resources

With the default environment on pangeo cloud we are able to start a cluster, but that doesn't use your updated version of distributed.

This succeeds:

import coiled
coiled.create_software_environment(
    conda="environment.yml",
)

But then cluster = coiled.Cluster() never finished.

When we try to use our altered environment (with your distributed PR and xarray's main branch) the notebook hangs on creating the coiled cluster.

Is there some dask dependency we've forgotten here? Is there a better channel to get help with this coiled cluster issue?

gjoseph92 commented 2 years ago

@TomNicholas why don't you join the Coiled slack; it'll be easier to go back and forth there. I think this link works, but if not it's at the bottom-left of https://cloud.coiled.io: https://join.slack.com/t/coiled-users/shared_invite/zt-hx1fnr7k-In~Q8ui3XkQfvQon0yN5WQ.

I also haven't tested my PR at all. It's entirely possible I messed up something silly. The cluster logs might show more. I also wouldn't be surprised if you need to add git+https://github.com/dask/dask.git@main, though I thought distributed might pull this in automatically.

jbusecke commented 2 years ago

I also wouldn't be surprised if you need to add git+https://github.com/dask/dask.git@main, though I thought distributed might pull this in automatically.

We actually tried this actually, but it failed due a dependency pinned on the last dask release. Just FYI

gjoseph92 commented 2 years ago

@TomNicholas and I reproduced a variant of his failing workload on Coiled, running on my PR https://github.com/dask/distributed/issues/6467. We used worker resources to annotate root tasks as described in https://github.com/dask/distributed/issues/6360#issuecomment-1129445795. With my PR, this made it so the scheduler would stream root tasks into workers, only submitting a new root task when it heard the previous had completed, instead of submitting all root tasks up front, which the current scheduling algorithm does.

This allowed us to simulate how a cluster would perform if root task overproduction were fixed. The results are compelling: plot

Withholding root tasks from workers enormously reduced memory use (15x lower peak memory usage) while cutting runtime nearly in half.

_These trials used the same workload on the same cluster, both on https://github.com/dask/distributed/issues/6467. The only difference was "root tasks withheld" wrapped xr.open_dataset in dask.annotate(resources={"ROOT": 1}), and "standard" did not. All workers were r5.2xlarge, with 1 ROOT resource each._

You can see how different memory usage looked on the dashboard:

dashboards

It's important to note that because we used resource restrictions, scheduling didn't use co-assignment logic (https://github.com/dask/distributed/pull/4967). I believe that's the reason you see transfers on the second dashboard (two tasks that contributed to the same one output were scheduled on different workers). Implemented properly to take advantage of this, I expect fixing root task overproduction would give even better runtime (maybe 20-40% faster?) and lower memory usage than what we see here.


Important takeaways:

I estimate that a scheduler-only fix for root task overproduction, which just withheld root tasks from workers while maintaining our co-assignment logic, could be implemented in a week.

cc @fjetter @hayesgb @mrocklin

mrocklin commented 2 years ago

Those are compelling plots.

There's a lot here. Is there a thing that I or someone like @fjetter should read to understand the proposed design?

In principle if we could reliably solve this problem with a week of dev time and not cause other problems then that sounds like time well spent.

gjoseph92 commented 2 years ago

Part of the week includes coming up the the design. But the overall idea is something like:

The implementation of this is what needs consideration, particularly how we still get co-assignment (which right now relies on running through all the root tasks at once). Maybe something like:

The advantage is that it all happens on the scheduler. Nothing has to change with the worker; from the worker's perspective, it just doesn't get lots of tasks queued on it. That makes me more confident in it not causing deadlocks, since the scheduler is generally easier to reason about than the worker state machine.

This obviously introduces a bit more idleness on the worker: when it completes a root task, it doesn't have another root task queued, so it has to wait for a roundtrip to the scheduler for the next instruction. Speculative task assignment would eventually be a nice optimization to avoid this. This experiment here shows accepting that latency (and a horrendously slow scheduling algorithm; it couldn't even handle 200k tasks) but taking a better path through the graph is still faster overall than quickly taking a bad path through the graph. Slow is smooth, smooth is fast.

mrocklin commented 2 years ago

Historically I would have said "I'd rather not complicate the scheduler further. It makes sense to have the worker handle this given how relatively simple the worker is" With recent changes though I'm not sure that that is still the case. It seems like the worker might be a place of more complexity than the scheduler these days. I neither love nor hate this plan.

gjoseph92 commented 2 years ago

The scheduler-only aspect of it is the main appeal to me, for that reason. The worker also can't do this on its own without extra information from the scheduler (it doesn't know enough about overall graph structure). So we either have to touch both scheduler and worker, or only scheduler.

mrocklin commented 2 years ago

It could guess. We could pass rootishness around. It could also see that "hey, I often have tasks that arrive just after these rootish tasks are done. I guess I should maybe wait a bit". I'm not pushing for this though. The worker seems to be a larger source of complexity today than the scheduler.

gjoseph92 commented 2 years ago

I really don't like the idea of guessing. Passing rootishness around was one of my thoughts, but it introduces consistency issues (if downstream tasks are cancelled, you have to inform workers to de-rootify their tasks) without providing any complexity or performance benefit (without STA, the worker would still have to wait for a scheduler roundtrip before it could run a new task).

mrocklin commented 2 years ago

I'm not pushing for my thought above. There's no need to argue against it.

I do think that you should develop some comfort with guessing though. We don't need to be perfect, just good in aggregate.

fjetter commented 2 years ago

I agree with @gjoseph92 here and I wouldn't want to implement any logic on the worker that would introduce artificial waiting times. It's current state is not well suited to handle this kind of logic well. I do not see any problems with STA on worker side. We likely need to adjust a few transition rules but that should be doable after the latest changes.


Don't have more than SchedulerState.total_nthreads of these root-ish tasks in processing at once

The request of "I would like to limit the total number of tasks of type X concurrently executing on the cluster" has come up a couple of times, see

I'm wondering if this can/should be solved first generically before we write a specialized version for root tasks. That would obviously only be part of the solution but it would be a re-usable part

The implementation of this is what needs consideration, particularly how we still get co-assignment

Can we first deliver something that does not care about co-assignment? IIUC, your prototype does not consider co-assignment and it still is a major win.

mrocklin commented 2 years ago

I agree with @gjoseph92 here and I wouldn't want to implement any logic on the worker

Just to be perfectly clear, so do I. I brought up the other approach to say "there are other ways we could think about doing this" not to say "we should do it this way instead".

gjoseph92 commented 2 years ago

Update: we're planning on working on this, starting late next week or the week after: https://github.com/dask/distributed/issues/6560. The goal is a draft PR (not necessarily merged) within 2 weeks. When that's up, we'd very much appreciate input from people involved in this thread who could test us out and let us know how it affects performance on real-world cases.

TomNicholas commented 2 years ago

Here's a notebook containing a simple example demonstrating the root task overproduction problem.

This example only uses dask, not importing xarray or xGCM anywhere. It's set up to run on a coiled cluster, using Gabe's PR. The conda environment file is just

name: root_overproduction_test_random
channels:
  - conda-forge
dependencies:
  - ipykernel
  - coiled
  - numpy
  - python-graphviz
  - graphviz
  - pip
  - pip:
    - git+https://github.com/gjoseph92/distributed.git@f05c94e4c986d7e23e27215ce646a12d6008c849

The offending graph looks like this

image

I think @gjoseph92 was able to come up with some even simpler examples, but I'm sharing this one because it's the simplest one I could create that still looks somewhat like the original problem I was trying to compute (with xGCM).

gjoseph92 commented 2 years ago

Two even simpler examples (pure dask). These are a + b with just enough complexity to make them not fuse into a single task:

import dask.array as da

a = da.random.random(100, chunks=10)
b = da.random.random(100, chunks=10)
s = da.stack([a, b])
r = s.sum(axis=0)
r.visualize(optimize_graph=True)

mydask

import dask.array as da

a = da.random.random(100, chunks=10)
b = da.random.random(100, chunks=10)
r = a[1:] + b[1:]
r.visualize(optimize_graph=True)

mydask

sum and slicing aren't blockwise operations. So graph optimization won't fuse a, b, and the operation into a single task per chunk—there will be three tasks per chunk.

The input tasks (random array creation) will be overproduced relative to output tasks (sum/add), leading to excessive memory use.

(@TomNicholas is demonstrating the same thing in his notebook. Instead of slicing or concatenation, he's adding a rechunk step before the a + b, which has the same effect of preventing fusion.)

gjoseph92 commented 2 years ago

Also, the problem @TomNicholas has been demonstrating here is a simplified form of what he actually needs to solve.

Here, he's computing a + b (which runs out of memory due to root task overproduction).

What he actually needs to compute is broadcast1 * a + broadcast2 * b. Where a and b are n-dimensional, and the broadcast are (N-1)-dimensional, and broadcast against many chunks of a and b.

This not only suffers from root task overproduction, but also hits the "widely-shared dependencies dogpile": https://github.com/dask/distributed/issues/6570