ocean-transport / coiled_collaboration

Repository to track and link issues across other repositories, which are relevant to the Abernathey Lab and Coiled
0 stars 0 forks source link

Pangeo examples summary #17

Open ncclementi opened 3 years ago

ncclementi commented 3 years ago

This is a document that intends to consolidate the latest discussion results around the Coiled-Pangeo collaboration, and answers some remaining questions regarding speedups and performance.

Let's concentrate on the latest questions by @rabernat posted on https://github.com/dask/distributed/issues/2602#issuecomment-891156357. This comment shows the results of some computations performed with a version of Dask previous and post @gjoseph92 PR on pangeo-binder, where @rabernat can't obtain the same speedups reported previously by @gjoseph92 in https://github.com/dask/distributed/issues/2602#issuecomment-870930134.

Let's breakdown the examples in the notebook that contains the code run by @rabernat on pangeo-binder: https://gist.github.com/rabernat/39d8b6a396e076d168c24167b8871c4b

Side note: I was never able to launch the binder for the Dask version (2021.07.1) so I personally couldn't re-run these cases on pangeo-binder.

Cluster configurations used on pangeo-binder

nworkers = 30
worker_memory = 8
worker_cores = 1
use_MALLOC_TRIM_THRESHOLD = True

and setting {"MALLOC_TRIM_THRESHOLD_": "0"} via Dask gateway.

Synthetic Data Example 1:

import dask.array as dsa
import numpy as np
import xarray as xr

data = dsa.random.random((10000, 1000000), chunks=(1, 1000000))
da = xr.DataArray(data, dims=['time', 'x'],
                  coords={'day': ('time', np.arange(10000) % 100)})
clim = da.groupby('day').mean(dim='time')
anom = da.groupby('day') - clim
anom_std = anom.std(dim='time')

For this example @rabernat reports that without use_MALLOC_TRIM_THRESHOLD , workers die, and with use_MALLOC_TRIM_THRESHOLD :

Quoting @rabernat: "I have found that the critical performance issues are largely resolved even in Dask 2020.12.1 when I use options.environment = {"MALLOC_TRIMTHRESHOLD": "0"}....if I don't set MALLOC_TRIMTHRESHOLD, both clusters crash. This leads me to conclude that, for these workloads, MALLOC_TRIMTHRESHOLD is much more important than #4967

Questions from @rabernat:

Answer/comments

There are some key points why for this case, @gjoseph92 PR might seem that is not kicking in.

  1. As Matt pointed out in this comment, this is a very gnarly graph, and there is not much that can be done here since it requires to hold all the data, since you need it to compute clim and then again to compute anom_std.
  2. If the cluster has enough total memory to hold the array, (it's 200GiB---not unreasonable), this should still work. Colocation helps (Gabe's)
  3. Gabe reported 6x speedup for this case with a cluster configuration that is different.
n_workers=20,
worker_cpu=2,
worker_memory="10GiB",

and setting {"MALLOC_TRIM_THRESHOLD_": "0"}

But @gjoseph92 mentioned in an email thread: "I also can't reproduce anom_mean failing to compute with an earlier version. Looking at my original performance report I see 19 workers, though I had wait_for_workers(20). It's possible that a worker died at some point and I didn't realize it. Since the dataset was 200GB, losing even 1 worker would make the cluster memory smaller than the dataset, requiring spilling. Note that I don't think anom_mean was a very good test case for this improvement, since it's inherently transfer-heavy."

Then it could be that the 6x reported for this case probably came from that error.

  1. The reason why @rabernat sees that there is almost no difference between the old (Dask 2020.12.0: 2min 17s) and newer version (Dask 2021.07.1: 2min 21s) of Dask might be because the older version can actually run and has pretty much no spilling due to the extra memory.

Another question from @rabernat:

Is there a downside to setting MALLOC_TRIMTHRESHOLD=0 on all of our clusters? Answer: As of Dask: 2021.07.2 Dask started setting MALLOC_TRIM_THRESHOLD_=65536 by default for workers using a Nanny. The lower the number, the bigger is the number of system calls, so setting it to 0 would imply more system calls. This could potentially (theoretically) make things slower, however, if you are having memory problems then setting MALLOC_TRIM_THRESHOLD_=0 is probably the right call, since not doing it could imply still turning out of memory. @jrbourbeau and @gjoseph92 feel free to correct me and/or add any extra comments to this.

Synthetic Data Example 2

This example was inspired on this comment https://github.com/dask/distributed/issues/2602#issuecomment-535009454 In @rabernat notebook the example is:

size = (28, 237, 48, 21, 90, 144)
chunks = (1, 1, 48, 21, 90, 144)
arr = dsa.random.random(size, chunks=chunks)
arr

items = dict(
    ensemble = np.arange(size[0]),
    init_date = pd.date_range(start='1960', periods=size[1]),
    lat = np.arange(size[2]).astype(float),
    lead_time = np.arange(size[3]),
    level = np.arange(size[4]).astype(float),
    lon = np.arange(size[5]).astype(float),
)
dims, coords = zip(*list(items.items()))

array = xr.DataArray(arr, coords=coords, dims=dims)
dset = xr.Dataset({'data': array})
display(dset)

result = dset['data'].groupby("init_date.month").mean(dim=["init_date", "ensemble"])
result

where result.compute() is reported as taking:

Question from @rabernat (quoting):

"I ran a slightly modified version of the "dougiesquire's climactic mean example" and added it to the gist. The only real change I made was to also reduce over the ensemble dimension in order to reduce the total size of the final result--otherwise, you end up with a 20GB array that can't fit into the notebook memory. climactic mean example code

Using MALLOC_TRIMTHRESHOLD=0 and comparing Dask 2020.12.0 vs. 2021.07.1, I found 6min 49s vs. 4min 13s. This is definitely an improvement, but very different from @gjoseph92's results ("main + MALLOC_TRIMTHRESHOLD=0: gave up after 30min and 1.25TiB spilled to disk (!!)")

So I can definitely see evidence of an incremental improvement, but I feel like I'm still missing something."

Answer/scomments

For this case, @rabernat is reporting not seeing the 6x time improvement that @gjoseph92 reported. But @rabernat results are not directly comparable with the ones reported by @gjoseph92 in https://github.com/dask/distributed/issues/2602#issuecomment-870930134 since:

  1. @gjoseph92 cluster configuration is different, he used:

    n_workers=20,            
    worker_cpu=2,
    worker_memory="20GiB"

    while the cluster configurations used by @rabernat are:

    nworkers = 30
    worker_memory = 8
    worker_cores = 1
  2. The size of the array and the chunks used are different: @gjoseph92 used:

    size = (28, 237, 96, 21, 90, 144)
    chunks = (1, 1, 96, 21, 90, 144)

while @rabernat

size = (28, 237, 48, 21, 90, 144)
chunks = (1, 1, 48, 21, 90, 144)

Quoting @gjoseph92 response to this example on an email thread: "Rerunning my exact code for Dougie's case, it still took an hour on earlier versions with the spilling I initially reported, and 2min with the change. I noticed your test both had a smaller chunk size (48 vs 96) and differently sized workers and the number of workers. This could be the reproducer you're looking for."

Summary and a good example to showcase @gjoseph92 PR (https://github.com/dask/distributed/pull/4967)

The second example is very sensitive to cluster resources. Quoting @gjoseph92 "If you want an example to use, you'll need to tune worker memory and count such that it does complete with an old version, but very very slowly and with lots of spilling. If it worked before, it will certainly work after, but possibly faster. If it didn't work before, maybe it will work after, but only if you were on the edge of it working—if the workload just inherently needs 200GB of memory (like your anom_mean) and you're giving it 150, that's never going to be possible. You'll see the most obvious gains with extremely simple things like jskenyon's sum(axis=1) example."

jskenyon's sum(axis=1) refers to the example in this github issue: https://github.com/dask/distributed/pull/4864#issue-658828506 Although the Synthetic example 2 with the proper chunksize and worker memory tunning could also be a good case too.

Another example and a good explanation on how @gjoseph92 works, and what kind of workloads benefit from it, can be found on this video explanation by Matt Rocklin https://www.youtube.com/watch?v=9RRP3AESLp0

rabernat commented 3 years ago

Thanks so much @ncclementi for taking the time to summarize our current state of affairs so thoroughly. 🙌

Side note: I was never able to launch the binder for the Dask version (2021.07.1) so I personally couldn't re-run these cases on pangeo-binder.

This may be a problem with the nbgitpuller version on that image. I will try to update it to use a working one.

  1. this is a very gnarly graph, and there is not much that can be done here since it requires to hold all the data, since you need it to compute clim and then again to compute anom_std.

Thanks for the explanation. Let's keep this on our radar of computations we want to optimize. It is ubiquitous in climate science. One thing that ultimately might make it run better would be the proposed Caterva / Zarr integration (https://github.com/zarr-developers/zarr-python/issues/713), which would help each worker slice the underlying Zarr data in a more memory-efficient way.

My takeaway from the other comments is that the failure modes are highly sensitive to the precise details of the cluster. In general, I think we should continue working on memory management with the goal of making things work a little more robustly. For example, if a computation works well with 200 GB of cluster memory, it should not fall completely on its face with 190 GB of memory.

It sounds like there is still plenty of ongoing work with memory management in Dask. I'm going to put the blog post idea on pause for a while (~2 months) and revisit this then.

mrocklin commented 3 years ago

@rabernat for this example

import dask.array as dsa
import numpy as np
import xarray as xr

data = dsa.random.random((10000, 1000000), chunks=(1, 1000000))
da = xr.DataArray(data, dims=['time', 'x'],
                  coords={'day': ('time', np.arange(10000) % 100)})
clim = da.groupby('day').mean(dim='time')
anom = da.groupby('day') - clim
anom_std = anom.std(dim='time')

Would it be helpful to see an example using bind/clone and friends? If so, we can ask Guido to set this up.

gjoseph92 commented 3 years ago

The lead example in https://docs.dask.org/en/latest/graph_manipulation.html is actually exactly this case. I'm guessing it would be like

from dask.graph_manipulation import bind
...
clim = bind(clim, da)
...