dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Unmanaged (Old) memory hanging #6232

Open josuemtzmo opened 2 years ago

josuemtzmo commented 2 years ago

What happened:

I am using Xarray and Dask to analyse large datasets ~100s of Gb, to reduce their dimensions from f(x,y,z,t) to f(t) by doing averages or sums. Several times I've encountered that Unmanaged (Old) memory hangs in the Cluster memory until it kills the client.

MemorySampler for one of my analysis for a spatial average, as show in the graph, the cluster memory never decreases towards it's pre-run state (It looks like a memory leak, but the computation is only dataset.mean()). image

I've tried setting "MALLOC_TRIM_THRESHOLD_" to 16284 and 0 and the issue persists.

I've also tried manually trimming memory:

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client.run(trim_memory)

and although, this usually only frees <1Gb and the cluster unmanaged memory remains in the order of 10s of Gb.

In order to better report this issue, I've attached an example that leaves around 1GB of unmanaged (old) memory (I'm not sure if this is related to issue https://github.com/dask/dask/issues/3530)

https://gist.github.com/josuemtzmo/3620e01c9caf88c18109809c52d77180

After lazy loading a test dataset, the prices memory is ~159.01 MiB usingpsutil. After running the computation; a mean on space (i.e. f(t,x,y).mean(('x','y')), the process memory is ~256.83 MiB using psutil. This increase in memory is expected, however the cluster memory remains in 1.27GiB, where each of the workers has around 180MiB of Unmanaged (Old) memory hanging.

Screenshot 2022-04-28 at 16 48 57

Running the manual trim memory, the cluster memory only decreases the cluster memory from 1.26GiB to 1.23GiB. If the same code is run several times, the unmanaged memory continues to increase.

What you expected to happen:

Environment:

Thanks!

hayesgb commented 2 years ago

Have you tried using the new Active Memory Manager? When I do:

client = Client()
client.amm.start()

with your gist notebook on my local machine, I get the following plot.

I'm using dask and distributed 2022.01.0

FYI: cc @crusaderky

Screen Shot 2022-04-28 at 11 33 42 AM

crusaderky commented 2 years ago

@josuemtzmo your notebook gist doesn't seem to reproduce the issue. With the same number of workers, threads per worker, and memory limit, for me it finishes in 3.5s instead of your 2m5s (reading from local SSD). Does the generator code for test.nc in the gist produce the full sized data? Or did you make it smaller before you published it? Also, in both the gist and in your second plot here the peak memory usage is 1 GiB cluster-wide and the shape doesn't look like the first plot you displayed.

crusaderky commented 2 years ago

This increase in memory is expected, however the cluster memory remains in 1.27GiB, where each of the workers has around 180MiB of Unmanaged (Old) memory hanging.

This is barely more than just xarray and all its dependencies.

$ python
>>> import xarray, psutil
>>> psutil.Process().memory_info().rss / 2**20
126.58203125

the cluster memory never decreases towards it's pre-run state

What magnitude of leak are we talking about? Each worker holds its logs in a fixed-size, fairly long deque, so it's normal to have a bit of increase towards the beginning. Please try rerunning for ~1 hour nonstop without restarting the workers; you should see it plateau to a fairly reasonable level.

josuemtzmo commented 2 years ago

Thanks for the suggestions.

@hayesgb I had no idea of the Active Memory Manager. Although it improves a bit by reducing the memory usage of the cluster, the issue persists (see figures below).

@crusaderky I ran the notebook in both my local computer (MacOS) and the server referred in the issue and the results are the same. Furthermore, I run only the code provided in #6241 and the issue is still there for both my local and server machines.

Run on SLES linux (server) using dask 2022.4.1 on python 3.10.4 (I created a new env form scratch to test this)

Without AMM: Screenshot 2022-04-29 at 11 42 24

With AMM: Screenshot 2022-04-29 at 11 47 38

On my personal computer (MacOS ARM chip) using dask 2022.4.1 on python 3.9.12 (new env too)

Without AMM: Screenshot 2022-04-29 at 11 55 27

With AMM: Screenshot 2022-04-29 at 11 55 34

Furthermore, if I rerun the same code in the same cluster, the unmanaged (old) memory continues to increase... (two runs with a 15 second pause in between them)

image

crusaderky commented 2 years ago

The substantial growth of unmanaged memory on MacOSX is a well known issue: https://github.com/dask/distributed/issues/5840 It's not a leak, it's just that it doesn't shrink after use. Run 10-20 times and you'll see the same memory reused.

On SLES, what your plots are showing me is that, in the middle of a computataion, the unmanaged memory is ~10GiB, or ~1.4GiB per worker. Considering that it includes the heap of the tasks and the network buffers, this is perfectly normal and healthy. If it's too much for you, you need to reduce your chunk size.

Furthermore, if I rerun the same code in the same cluster, the unmanaged (old) memory continues to increase... (two runs with a 15 second pause in between them)

Run it 10 times and I expect it to plateau.

To recap:

crusaderky commented 2 years ago

P.S. I notice that your SLES notebook lost the env={"MALLOC_TRIM_THRESHOLD_":16284}. That was important to work around https://github.com/dask/distributed/issues/5971.

josuemtzmo commented 2 years ago

After running several times, the .compute() with a delay of 15 seconds between each, the worker memory raises the warning:

distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information.

Here is the memory usage graph for 10 runs, effectively it plateaus:

image

The drop at the end occurs when the manual trim is performed. Perhaps,I guess I expected the memory to be released more often. "MALLOC_TRIM_THRESHOLD_":16284 setting it up before spinning the client as #5971 doesn't seem to make a big difference, neither decreasing the chunks. In fact from what I've tested the lower the chunks the larger the usage of memory... (I will post the memory usage of smaller chunks later)

crusaderky commented 2 years ago

setting it up before spinning the client as https://github.com/dask/distributed/issues/5971 doesn't seem to make a big difference

To clarify: are you doing this in Linux? On MacOS that variable won't do anything.

josuemtzmo commented 2 years ago

To clarify, I'm testing the MALLOC_TRIM_THRESHOLD_ in Linux, so far I get some strange results in which the memory just hangs and kills the kernel. I keep exploring this...

fmaussion commented 2 years ago

[EDIT]: false alarm. See https://github.com/pangeo-data/rechunker/issues/100#issuecomment-1116189019 for solution to my problem. Previous message below for reference.

Just to add my usecase here. On Linux and trying all the solutions I could gather here, I have exactly the same problem with huge memory leaks in even a "fit in memory" dask problem: https://nbviewer.ipython.org/gist/fmaussion/5212e3155256e84e53d033e61085ca30

dcherian commented 2 years ago

@crusaderky's health/not healthy list would be a pretty great addition to the docs!

josuemtzmo commented 2 years ago

I've investigated a bit more in detail the memory for different chunk sizes, and what I found seems a bit unexpected; the code I'm using to test this is:

import dask
import os
import dask.array as da
import distributed
import time
from distributed.diagnostics import MemorySampler

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

os.environ["MALLOC_TRIM_THRESHOLD_"] = str(dask.config.get("distributed.nanny.environ.MALLOC_TRIM_THRESHOLD_"))

c = distributed.Client(n_workers=7, threads_per_worker=2, memory_limit="2 GiB")
c.amm.start()

a = da.random.random((10_000, 10_000), chunks=(2_000, 2_000))
b = (a @ a.T).mean()

ms = MemorySampler()
with ms.sample("With AMM"):
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(60)
    b.compute()
    time.sleep(15)
    b.compute()
    time.sleep(15)
    b.compute()
    c.run(trim_memory)
    time.sleep(60)
ms.plot(figsize=(15, 10), grid=True)

The best performance and best memory handling occurs when the chunk size is 2_000. image However as I decrease the chunk size to 1_500 or 1_000, the cluster memory seems to flush less frequently. image image If I decrease the chunk size to 800, the tcp transmissions to the workers fails and the execution hangs.

Is this expected?

crusaderky commented 2 years ago

However as I decrease the chunk size to 1_500 or 1_000, the cluster memory seems to flush less frequently.

1500 1500 8 = 17 MiB, which is much larger than the MALLOC_TRIMTHRESHOLD of 16 kib - so, at face value, this is not expected. However your graphs don't have a scale, so I don't know how much difference you're talking about. (a @ a.T).mean() includes a bunch of reduction tasks, which are small in size. With a memory_limit=2GiB, they matter.

If I decrease the chunk size to 800, the tcp transmissions to the workers fails and the execution hangs.

This definitely must not happen, but it's unrelated with memory management. Could you open a reproducer, with logs and scheduler dump, in a different ticket?

n3rV3 commented 1 year ago

we are facing this issue, tried the snippet shared earlier:

from dask.distributed import get_client
import dask.array as da
from distributed.diagnostics import MemorySampler

client = get_client('10.10.10.1:8786')

import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client.amm.start()
a = da.random.random((20_000, 20_000), chunks=(2_000, 2_000))
b = (a @ a.T).mean()

ms = MemorySampler()
with ms.sample("With AMM"):
    b.compute()

client.run(trim_memory)
ms.plot(figsize=(15,10), grid=True)

The memory increases on workers from around 20GB to 50 GB and keeps growing steadily. It grows significantly if i make the following change:

a = da.random.random((200_000, 200_000), chunks=(20_000, 20_000))

with above, the memory usage crossed 80GB and stayed there, even with client.run(trim_memory) and other tricks.

I have tried setting MALLOC_TRIM_THRESHOLD_ to 0 for all dask workers, but that doesn't help either.

This is an issue for us, because we run dags which do significantly more processing than the above snippet and the memory utilization of the cluster crosses 250 GB. We have a cluster capacity of around 2 TB(memory). After some time the memory utilization becomes an issue and the workers get killed in between task executions thereby delaying our pipelines.

I tried the suggestions in this issue and also tried basic jemalloc tuning but nothing has helped so far.

I would like to understand if we can quickly address this.

crusaderky commented 1 year ago

@n3rV3 does the unmanaged memory stay there when there are no tasks running at all? does it stay there after you release all your futures and persisted collections (so that your managed memory becomes 0)? what's your dask version? you say 2tb cluster capacity; what's the capacity per worker? what kind of cluster does your client connect to? (dask-kubernetes, coiled, in-house...?) on what OS do the workers run?

n3rV3 commented 1 year ago

Q: Does the unmanaged memory stay there when there are no tasks running at all? Ans: yes the unmanaged memory stays there even with no tasks running at all.

Q: Does it stay there after you release all your futures and persisted collections (so that your managed memory becomes 0)? Ans: Futures are not used, not sure about removing persisted collections. But, in the snippet I shared earlier, I wasn't persisting any collections. Still the memory kept growing with each run.

Q: what's your dask version? Ans: dask version is: 2022.12.1

Q: what's the capacity per worker? Ans: 14 instances are running, each instance has 12 workers, each worker is single threaded with 14GB RAM The instances themselves have 16 cpu cores and 123GB RAM. The processes are allocated more memory than available on each instace. But, overall memory utilization of dask never crosses 60% on any node, this allows us to run the cluster without OOMs(so far).

Q: what kind of cluster does your client connect to? (dask-kubernetes, coiled, in-house...?) Ans: Cluster is running on EC2, with dask-workers(running on many EC2 instances) connecting to dask-scheduler instance at startup. A systemd file manages dask service, relevant execution line:

ExecStart=/usr/local/bin/dask-worker --nthreads 1 --nworkers 12 --memory-limit 14000MB --death-timeout 300 --pid-file /usr/local/dask/run/dask-worker.pid tcp://10.10.10.10:8786

Q: on what OS do the workers run? Ans: Ubuntu 22.04, Linux Kernel should be 5.15.0-1027-aws on all instances.

crusaderky commented 1 year ago

Q: Does it stay there after you release all your futures and persisted collections (so that your managed memory becomes 0)? Ans: Futures are not used, not sure about removing persisted collections. But, in the snippet I shared earlier, I wasn't persisting any collections. Still the memory kept growing with each run.

The dashboard must show managed memory = 0 in the top left corner. Alternatively, you can run client.run(lambda dask_worker: len(dask_worker.data) to learn how many keys you have in memory.

Once you have no keys in memory: how much memory per node are you talking about, at rest?

If it's significant (>2 GiB), you are likely suffering from a memory leak, potentially something that dask is not responsible for. What libraries are you calling from your tasks?