dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

WARNING - Memory use is high but worker has no data to store to disk #4193

Open Hoeze opened 3 years ago

Hoeze commented 3 years ago

I end up getting a lot of error messages like this:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 6.15 GB -- Worker memory limit: 8.45 GB

I'm relatively sure that this warning is actually true. Also, the workers hitting this warning end up in idling all the time.

Is there a way to automatically retire + restart the workers that hit this error message?

Anything else we need to know?: I'm using Dask v2.30

Environment:

Hoeze commented 3 years ago

A related issue I hit when residing workers a couple of times: https://github.com/dask/distributed/issues/3018

At some point, these high-memory workers are just stuck and do not do anything anymore.

quasiben commented 3 years ago

I don't think there is a method to auto-restart. You can manually call client.restart() which will restart all the workers. Do you have a concise reproducer ?

Hoeze commented 3 years ago

Thanks for your answer @quasiben. Unfortunately, this does not help in my case. I want to run my jobs unattended but to finish the last task I often have to manually restart a hanging worker (await client.cluster.workers[<id>].restart()). Also, client.restart() would restart the whole cluster and delete all the progress, so I would need to re-run all the computations.

quasiben commented 3 years ago

Hmm, could there in fact be a leak ? Can you run with more memory ?Can you describe the kind of operation(s) the worker is doing during this time ?

Hoeze commented 3 years ago

My function is fetching data from a SQL server and reads some data from TileDB. Afterwards it uses Pandas to do some aggregations and return a numpy array. Then I use dask.array.from_delayed() to map this function across a range of 2500 inputs.

I made sure that all connections are one-shot and therefore get removed after loading the data. However, I cannot completely rule out the possibility that one of the libraries is leaking memory but with memory-profiler I could not find a hint what would leak the memory.

More memory helps indeed, but only to keep the workers alive longer.

mrocklin commented 3 years ago

Is there a way to automatically retire + restart the workers that hit this error message?

I like this idea.

In practice Dask is unable to control memory leaks. These often come from other libraries that weren't designed to run in a multi-threaded context (indeed, glibc malloc itself has some issues here). I think that smoothly retiring workers in this situation would be a good way for us to add some robustness to this situation.

One challenge to this is what to do with the memory that is currently stored in the worker. It would be nice to transfer it to other workers before restarting (this will happen naturally as part of the worker retire process), but it might be hard to bring it back from disk, given the memory leak that has occurred.

As a first step, we could call await self.close_gracefully() alongside issuing that warning. @Hoeze would you be comfortable testing this out in your situation and reporting back? You will probably also have to add a keyword argument to close_gracefully that gets passed into the nanny= keyword of close, maybe something like this.


diff --git a/distributed/worker.py b/distributed/worker.py
index 00d69329..06013718 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1178,12 +1178,15 @@ class Worker(ServerNode):
             setproctitle("dask-worker [closed]")
         return "OK"

-    async def close_gracefully(self):
+    async def close_gracefully(self, nanny=None):
         """ Gracefully shut down a worker

         This first informs the scheduler that we're shutting down, and asks it
         to move our data elsewhere.  Afterwards, we close as normal
         """
+        if nanny is None:
+            nanny = not self.lifetime_restart
+
         if self.status in (Status.closing, Status.closing_gracefully):
             await self.finished()

@@ -1193,7 +1196,7 @@ class Worker(ServerNode):
         logger.info("Closing worker gracefully: %s", self.address)
         self.status = Status.closing_gracefully
         await self.scheduler.retire_workers(workers=[self.address], remove=False)
-        await self.close(safe=True, nanny=not self.lifetime_restart)
+        await self.close(safe=True, nanny=nanny)

     async def terminate(self, comm=None, report=True, **kwargs):
         await self.close(report=report, **kwargs)
@@ -2683,6 +2686,7 @@ class Worker(ServerNode):
                         if self.memory_limit is not None
                         else "None",
                     )
+                    await self.close_gracefully(nanny=False)
                     break
                 k, v, weight = self.data.fast.evict()
                 del k, v
Hoeze commented 3 years ago

Thanks a lot @mrocklin, I will test your patch in the next week.

mrocklin commented 3 years ago

Thanks! To be clear I haven't tested that, or even verified that it's valid Python. Some work on your end may still be necessary.

On Sat, Oct 31, 2020 at 10:29 AM Florian R. Hölzlwimmer < notifications@github.com> wrote:

Thanks a lot @mrocklin https://github.com/mrocklin, I will test your patch in the next week.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4193#issuecomment-719963683, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTD4QRTZYZRF6B5P54DSNRCQVANCNFSM4TBKFTIQ .

Hoeze commented 3 years ago

OK sure. I'm happy to debug :) Nonetheless if you have tips how to get dask workers into debug mode (with pycharm) I'd be even more happy :D

mrocklin commented 3 years ago

The testing infrastructure is pretty good for this, especially if you can write async code: https://distributed.dask.org/en/latest/develop.html#writing-tests

Dask can run itself entirely in one thread, which really helps with debugging.

On Sat, Oct 31, 2020 at 11:59 AM Florian R. Hölzlwimmer < notifications@github.com> wrote:

OK sure. I'm happy to debug :) Nonetheless if you have tips how to get dask workers into debug mode (with pycharm) I'd be even more happy :D

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4193#issuecomment-719973852, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTHXTTLIXQJJJP63MGLSNRM7XANCNFSM4TBKFTIQ .

Hoeze commented 3 years ago

Hi @mrocklin, I implemented your solution in #4221. It solves the problem of memory-leaking workers very effectively.

However, there are still workers that end up freezing. I created a minimal example to reproduce a memory-leaking function that freezes up workers:

import dask
import dask.distributed
import numpy as np
import time

cluster = dask.distributed.LocalCluster(n_workers=2, threads_per_worker=1, memory_limit="512M")
client = dask.distributed.Client(cluster)

x = {}
def memory_leaking_fn(data, sleep=0):
    x[data] = np.random.randint(100, size=12 * 1024**2 // 8)

    time.sleep(sleep)
    return data

futures = client.map(memory_leaking_fn, range(1000))
futures = client.map(memory_leaking_fn, range(1000), np.repeat(0.1, 1000))

for f in futures:
    print(f.result())
mrocklin commented 3 years ago

However, there are still workers that end up freezing.

I'm curious, what happens? Are there any logs or signs about what might be wrong?

Hoeze commented 3 years ago

@mrocklin it's the last remaining worker that never gets restarted.

Basically, I can reproduce two scenarios:

This happens a bit randomly, depending on the time delay between both workers restart. One can watch this nicely on the dashboard, though.

(I edited the test script in my last post slightly: change the sleep time + have only two workers)

KrishanBhasin commented 3 years ago

Hey @Hoeze, the errors in your traceback look a lot like the kinds of problems that 2.30.1 was supposed to address. Have you tried cherry-picking your commits over the top of that release to see if you still encounter those problems?

Hoeze commented 3 years ago

@KrishanBhasin I am currently running with a fork of d7f532caa1564ef09d456d60125c03200fa60fef (see #4221). So, yes, I am already using the fixes from 2.30.1, but they do not seem to solve the issue.

The workers still keep freezing and I cannot get my calculations to finish.

Hoeze commented 3 years ago

Hi all, are there some news on this topic? I tried current master but it somehow only makes things worse. There are:

I imagine all those instabilities could be fixed by in-place worker restarts + having the nanny hold all results, as this would make them failure-proof to worker failures.

Is there a way to push progress on this issue or get professional support on it?

yishairasowsky commented 2 years ago

@mrocklin @Hoeze @quasiben @KrishanBhasin @jedbrown I would love to know what I can do to solve the issue, because I am suffering from it as well.