dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Mild memory leak in dask workers #8164

Open rabernat opened 1 year ago

rabernat commented 1 year ago

Describe the issue:

I have been observing a mild but consistent increase in memory when storing large arrays to Zarr. I have reproduced this with both s3fs (my real use case) and with a dummy "Dev Null" store of my own devising.

I would expect to be able to store arrays of essentially infinite size in streaming fashion. Instead, this memory leak means that eventually I will run out of memory.

I'm aware that the root of this issue may be hard to diagnose. The ultimate cause may be upstream, in Zarr, where I am a maintainer. Very happy to work with the developers here to isolate and resolve the underlying issue. 🙏

I do not know if this issue occurs with other schedulers, as I don't know how to diagnose memory usage as conveniently as with distributed.

Minimal Complete Verifiable Example:

import dask
import dask.array as da
from dask.distributed import LocalCluster
from distributed.diagnostics import MemorySampler
import zarr

class DevNullStore(dict):
    """Dummy store in which data just vanishes."""

    def __setitem__(self, key, value):
        # only store attributes
        if key == '.zarray':
            super().__setitem__(key, value)

shape = 10_000_000
chunks = 1_000
data = da.zeros(shape, chunks=chunks)

cluster = LocalCluster(
    n_workers=8, threads_per_worker=1, host="*",
)

ms = MemorySampler()
store = zarr.storage.KVStore(DevNullStore())
with cluster.get_client() as client:
    with ms.sample("nullstore-zarr_v2-compute_true"):
        data.to_zarr(store, compressor=None)

ms.plot(align=True)

image

As you can see, memory usage increases steadily over the course of the computation. If you make the array larger, the trend continues.

Anything else we need to know?:

A likely objection to this issue might be "this is an artifact of your funky DevNullStore". However, the same behavior can be reproduced (albeit much more slowly) with s3fs. This example requires write access to s3

``` import s3fs import uuid # replace with a bucket you can write to s3_url = "s3://earthmover-rechunk-tmp/memory-leak-test" def make_s3_store(): url = f'{s3_url}/{uuid.uuid4().hex}' store = zarr.storage.FSStore(url=url) return store with cluster.get_client() as client: client.restart() with ms.sample("s3fs-zarr_v2-compute_true"): data.to_zarr(make_s3_store(), compressor=None) ms.plot(align=True) ```

image

As you can see, the overall magnitude of the memory leak is similar: about 200 MB. This suggests that it is independent of the underlying store.

Environment:

cc @crusaderky, who helped us with an earlier iteration of this problem.

mrocklin commented 1 year ago

Thanks @rabernat for the well crafted example. I suspect that folks will take a look tomorrow.

rabernat commented 1 year ago

This issue would have been so much more difficult to describe without MemorySampler. What a great tool! 🙌

fjetter commented 1 year ago

I strongly suspect this is due to internal state keeping. We keep logs about internal state transitions/events and even store some actual log messages. All of these logs are bound to a maximum size and most (unfortunately not all) are configurable.

In your example, 200MB on 8 workers, i.e. 25MB each sounds about right for this kind of logging.

For instance, we have Worker.log which holds task transition logs. An entry of this log could look like

("zeroes_like-store-map-4567890", "executing", "memory", "memory", {}, "task-finished-34567898765", 1234567890.098765)

and this log deque is bound to 100_000 entries. (@crusaderky the worker transition log should likely also have a configurable transition-log-length)

In [6]: format_bytes(sizeof(("zeroes_like-store-map-4567890", "executing", "memory", "memory", {}, "task-finished-34567898765", 1234567890.098765)) * 100_000)
Out[6]: '48.07 MiB'

so that's about 50MB. Once it reaches this size, we stop storing more stuff in there (or rather throw out the oldest entries). You probably haven't reached this limit, yet.

This is a known problem for the scheduler (typically it does not matter) see https://github.com/dask/distributed/issues/5570 or https://github.com/dask/distributed/issues/1795

The only config that affects the worker is currently distributed.admin.log-length which you can set to zero s.t. ordinary python logs are not kept in memory.

We'll look into this and if it's indeed the transition counter we'll adjust the log length and make it configurable.

mrocklin commented 1 year ago

@rabernat have you actually run into problems where workers run out of memory, or are you seeing the increase and afraid that it will continue forever?

rabernat commented 1 year ago

@rabernat have you actually run into problems where workers run out of memory

No I have not. This emerged from a broader investigation into memory usage. By the time we made the graph big enough to really cause problems, we would likely first hit other known scaling limitations due to the number of tasks. It would be perfectly reasonable to say, "yes, there is a small memory leak, but it doesn't affect any real-world workflows in practice," and prioritize this issue accordingly.

mrocklin commented 1 year ago

we would likely first hit other known scaling limitations due to the number of tasks

Which limitation did you run into, I'm curious? (hearing about these again is useful to help prioritize them).

rabernat commented 1 year ago

This graph had 10_000 tasks and leaked 200 MB of memory. If it had 100_000 tasks, it would leak 2GB of memory. However, at this point, our graph would start becoming large, in violation of the best practices. Symptoms of this would be very slow serialization time and high memory load on the scheduler.

crusaderky commented 1 year ago

Reproduced.

This is just the core behaviour of dask workers - nothing to do with zarr specifically.

import asyncio
import dask.bag as db
from distributed.diagnostics import MemorySampler
from distributed import Client, wait

WIDTH = 80_000
NITER = 10
data = db.from_sequence(range(WIDTH), npartitions=WIDTH)

client = Client(n_workers=16, threads_per_worker=2)

async def wait_cleanup(dask_worker):
    while dask_worker.state.tasks:
        await asyncio.sleep(0.1)

ms = MemorySampler()
with ms.sample():
    for _ in range(NITER):
        wait(data.persist())
        client.run(wait_cleanup)

ms.plot(grid=True, align=True)

image

After forcing all deques to maxlen=0 - both the configurable ones and the hardcoded ones - the plot changes as follows: image

as you can see, there's around 25~30 MiB worth of deques per worker, plus a genuine memory leak of roughly 400 bytes per task (2.3 GiB to 2.6 GiB over 80k tasks x 10 runs) - which is substantial in the long run.

Pending actions

fjetter commented 1 year ago

How would people feel if I reduced some deques (namely, transaction logs) from 100k entries to 10k?

Should be fine. I would even think zero is a good default since most users don't care about this. In the rare case where we instruct folks to create dumps we can also tell them to set these numbers

crusaderky commented 1 year ago

50 runs, no deques

image

crusaderky commented 1 year ago

I've reran the test, downstream of #8173 (log-length=0), this time without the MemoryMonitor.

import asyncio
import gc
import dask.bag as db
from dask.utils import format_bytes
from distributed.diagnostics import MemorySampler
from distributed import Client, wait

WIDTH = 80_000
data = db.from_sequence(range(WIDTH), npartitions=WIDTH)

client = Client(n_workers=16, threads_per_worker=2)

async def wait_cleanup(dask_worker):
    while dask_worker.state.tasks:
        await asyncio.sleep(0.1)

    gc.collect()
    await asyncio.sleep(0.25)
    gc.collect()
    await asyncio.sleep(0.25)
    return dask_worker.monitor.get_process_memory()

measures = []
for _ in range(100):
    wait(data.persist())
    mems = client.run(wait_cleanup)
    total = sum(mems.values())
    measures.append(total)
import pandas

df = pandas.DataFrame({"rss": measures})
df["delta"] = df.rss.diff()
df = df.iloc[1:]
df.index *= 80_000 // 16
df["delta"] = df["delta"].astype(int)
df["delta per task"] = df["delta"] // 80_000
df["delta per task"].rolling(10).mean().plot(grid=True)

On the X axis, you have the number of tasks per worker since the start of the test. On the Y axis, you have how many bytes worth of leak each task caused (rolling average).

image

>>> df["delta per task"].loc[200_000:].mean()
28.661290322580644

So while there is still a leak in the long term, it's less than 30 bytes per task, which I don't think we should spend time investigating as the only users impacted are those that keep the same workers online for weeks at a time running flat out.

crusaderky commented 1 year ago

There's a separate aspect to this, which may not be obvious to most:

to_zarr() will keep the run_specs of the final tasks and their output (a bunch of None's) on the workers until the whole computation is completed. So if you call it on a collection with a gigantic number of chunks running on an undersized cluster, you will see the memory usage increasing - very very slowly - over time, until the computation is completed.

This is, theoretically, solvable by wrapping the operation with a dask.graph_manipulation.checkpoint(split_every=64). This would set an upper bound to the number of tasks in memory on the workers without causing too much overhead (I've experimented with the default split_every=4 and the time overhead was substantial). However, it would cause an order-of-magnitude increase in the problem described at #8182, which makes me very hesitant to implement it.

However.

We're talking about 16 kiB per chunk (measured below). To put things in perspective, an array worth eight terabytes, chunked into 128 MiB chunks, will produce 65536 chunks. I've run the initial reproducer on such array, downstream of #8173, log-length=1000:

image

Note that this is on 8 workers. In real life, most people will use way more than 8 workers if they have to push through 8 terabytes of data. If you ran it on 80 workers, the cluster-wide magnitude of the leak would remain the same; in other words the leak per worker would go down from 128 MiB to 13 MiB.

fjetter commented 1 year ago

and their output (a bunch of None's)

None is a singleton. If all tasks are returning None, this will not add to the memory blowup. The TaskState objects will, of course.


In other words:

There are two sources for increasing memory

Is this an accurate summary @crusaderky ?


@rabernat is this kind of growth (i.e. order of 16KiB per chunk) negligible/acceptable/concerning for your application?

crusaderky commented 1 year ago

None won't add to this since it is a singleton

Well actually, :monocle_face: storing a None in the SpillBuffer costs ~240 bytes.

import psutil
import dask.array as da
from distributed.spill import SpillBuffer

!rm -rf spill
!mkdir spill
buf = SpillBuffer("spill", 2**44, False)
data = da.zeros(2**44)
proc = psutil.Process()

m0 = proc.memory_info().rss

for k in data.__dask_keys__():
    buf[k] = None

m1 = proc.memory_info().rss
(m1 - m0) / len(data.__dask_keys__())
242.921875

Is this an accurate summary @crusaderky ?

  1. on top of the two items you listed, there's a genuine leak of 25~30 bytes per task; more for the first ~200k tasks on the worker.