dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

memory leak when using distributed.Client with delayed #2068

Open iljau opened 6 years ago

iljau commented 6 years ago

I have used dask.delayed to wire together some classes and when using dask.threaded.get everything works properly. When same code is run using distributed.Client memory used by process keeps growing.

Dummy code to reproduce issue is below.

import gc
import os

import psutil
from dask import delayed

# generate random strings: https://stackoverflow.com/a/16310739
class Data():
    def __init__(self):
        self.tbl = bytes.maketrans(bytearray(range(256)),
                              bytearray([ord(b'a') + b % 26 for b in range(256)]))

    @staticmethod
    def split_len(seq, length):
        return [seq[i:i + length] for i in range(0, len(seq), length)]

    def get_data(self):
        l = self.split_len(os.urandom(1000000).translate(self.tbl), 1000)
        return l

class Calc():
    def __init__(self, l):
        self.l = l

    def nth_nth_item(self, n):
        return self.l[n][n]

class Combiner():
    def __init__(self):
        self.delayed_data = delayed(Data())

    def get_calc(self):
        d_l = self.delayed_data.get_data(pure=True)
        return delayed(Calc, pure=True)(d_l)

    def mem_usage_mb(self):
        process = psutil.Process(os.getpid())
        return "%.2f" % (process.memory_info().rss * 1e-6)

    def results(self):
        return {
            '0': self.get_calc().nth_nth_item(0),
            '1': self.get_calc().nth_nth_item(1),
            '2': self.get_calc().nth_nth_item(2),
            'mem_usage_mb': self.mem_usage_mb()
        }

    def delayed_results(self):
        return delayed(self.results())

def main_threaded_get():
    from dask.threaded import get as threaded_get
    from dask import compute

    for i in range(300):
        delayed_obj = Combiner().delayed_results()
        res = compute(delayed_obj, key=threaded_get)[0]
        #print(res)
        print("#%d, mem: %s mb" % (i, res['mem_usage_mb']))
        gc.collect()

def main_distributed_client():
    from distributed import Client
    client = Client(processes=True, n_workers=1, threads_per_worker=1)

    for i in range(1000):
        delayed_obj = Combiner().delayed_results()
        future = client.compute(delayed_obj)
        res = future.result()
        print("#%d, mem: %s mb" % (i, res['mem_usage_mb']))

        collect_res = client.run(lambda: gc.collect()) # doesn't help
        # print(collect_res)

if __name__ == "__main__":
    main_threaded_get()
    main_distributed_client()

Results:

main_threaded_get():
100, mem: 33.64 mb
200, mem: 33.64 mb
299, mem: 33.64 mb

main_distributed_client()
100, mem: 94.02 mb
200, mem: 96.02 mb
300, mem: 97.95 mb
400, mem: 100.11 mb
500, mem: 102.29 mb
600, mem: 104.48 mb
700, mem: 106.72 mb
800, mem: 108.20 mb
900, mem: 110.02 mb
999, mem: 112.22 mb
And also "distributed.utils_perf - WARNING - full garbage collections took 60% CPU time recently (threshold: 10%)" messages starting with i=30
Python 3.6.5
>>> dask.__version__
'0.18.0'
>>> distributed.__version__
'1.22.0'
mpeleshenko commented 6 years ago

I am seeing the same memory behavior with Python 3.5.5 and the above sample.

Python 3.5.5
>>> dask.__version__
'0.18.0'
>>> distributed.__version__
'1.22.0'
mrocklin commented 6 years ago

Interesting. When I run this I get something similar. Memory use climbs slowly in steps. I also get a number of warnings about garbage collection time taking a long while.

distributed.utils_perf - WARNING - full garbage collections took 62% CPU time recently (threshold: 10%)
#855, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 62% CPU time recently (threshold: 10%)
#856, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 62% CPU time recently (threshold: 10%)
#857, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#858, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#859, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#860, mem: 125.76 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#861, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#862, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#863, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#864, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#865, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 63% CPU time recently (threshold: 10%)
#866, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 64% CPU time recently (threshold: 10%)
#867, mem: 126.02 mb
distributed.utils_perf - WARNING - full garbage collections took 64% CPU time recently (threshold: 10%)
#868, mem: 126.02 mb

I'm curious how people generally debug this sort of issue. I might start with the following:

  1. Use some project, maybe like pympler, to look at what objects might be left over: https://pythonhosted.org/Pympler/muppy.html#the-summary-module
  2. Look at what objects are being GCed. Unfortunately I don't have much experience finding out what takes up GC time

If anyone has any experience here and has the time to investigate this further I would appreciate it.

iljau commented 6 years ago

tracemalloc may be an option: https://docs.python.org/3/library/tracemalloc.html#compute-differences

mpeleshenko commented 6 years ago

There is also objgraph which is useful for generating reference graphs of objects: https://mg.pov.lt/objgraph/

mrocklin commented 6 years ago

It would be great if people could try these tools and report back what they find

On Thu, Jun 28, 2018 at 7:32 AM, Michael Peleshenko < notifications@github.com> wrote:

There is also objgraph which is useful for generating reference graphs of objects: https://mg.pov.lt/objgraph/

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2068#issuecomment-401004357, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszLJbYCyfBZvSubPrwnvgTasnqgrEks5uBL69gaJpZM4UzTGE .

iljau commented 6 years ago
Code for: mleak.py ```python import gc import os import tracemalloc import psutil from dask import delayed from dask.distributed import get_worker # generate random strings: https://stackoverflow.com/a/16310739 class Data(): def __init__(self): self.tbl = bytes.maketrans(bytearray(range(256)), bytearray([ord(b'a') + b % 26 for b in range(256)])) @staticmethod def split_len(seq, length): return [seq[i:i + length] for i in range(0, len(seq), length)] def get_data(self): l = self.split_len(os.urandom(1000000).translate(self.tbl), 1000) return l class Calc(): def __init__(self, l): self.l = l def nth_nth_item(self, n): return self.l[n][n] class Combiner(): def __init__(self): self.delayed_data = delayed(Data()) def get_calc(self): d_l = self.delayed_data.get_data(pure=True) return delayed(Calc, pure=True)(d_l) def mem_usage_mb(self): process = psutil.Process(os.getpid()) return "%.2f" % (process.memory_info().rss * 1e-6) def results(self): return { '0': self.get_calc().nth_nth_item(0), '1': self.get_calc().nth_nth_item(1), '2': self.get_calc().nth_nth_item(2), 'mem_usage_mb': self.mem_usage_mb() } def delayed_results(self): return delayed(self.results()) ## ## def snapshot(): worker = get_worker() worker.snapshot1 = tracemalloc.take_snapshot() def top_stats(do_print_trace=False): worker = get_worker() snapshot2 = tracemalloc.take_snapshot() stats = snapshot2.compare_to(worker.snapshot1, 'traceback') for stat in stats[:5]: print(stat) if do_print_trace: for line in stat.traceback.format(): print(line) ## ## def main_threaded_get(): from dask.threaded import get as threaded_get from dask import compute for i in range(300): delayed_obj = Combiner().delayed_results() res = compute(delayed_obj, key=threaded_get)[0] print("#%d, mem: %s mb" % (i, res['mem_usage_mb'])) gc.collect() def main_distributed_client(): from distributed import Client client = Client(processes=True, n_workers=1, threads_per_worker=1) # up to 7 stacktrace lines client.run(lambda: tracemalloc.start(7)) for i in range(1000): client.run(lambda: snapshot()) delayed_obj = Combiner().delayed_results() future = client.compute(delayed_obj) res = future.result() print("#%d, mem: %s mb" % (i, res['mem_usage_mb'])) client.run(lambda: top_stats(do_print_trace=False)) # client.run(lambda: top_stats(do_print_trace=True)) # print call stack as well client.run(lambda: gc.collect()) # doesn't help print("---") if __name__ == "__main__": main_distributed_client() ```


Modified script compares memory usage using tracemalloc before computing delayed function and after.



If I'm interpreting tracemalloc results correctly, then it looks that memory usage grows when pickle.loads is called.

Run: python -X tracemalloc mleak.py

Top memory increases per invocation:

[..]/python3.6/site-packages/distributed/protocol/pickle.py:59: size=188 KiB (+5044 B), count=2019 (+50), average=95 B
[..]/python3.6/site-packages/distributed/worker.py:2130: size=11.3 KiB (+3176 B), count=16 (+5), average=721 B
[..]/python3.6/site-packages/tornado/gen.py:1046: size=3560 B (+2848 B), count=5 (+4), average=712 B
[..]/python3.6/site-packages/distributed/protocol/core.py:188: size=97.5 KiB (+2482 B), count=1042 (+25), average=96 B
[..]/python3.6/asyncio/events.py:145: size=2880 B (+2304 B), count=5 (+4), average=576 B

Call stack for distributed/protocol/pickle.py (different invocation)

[..]/python3.6/site-packages/distributed/protocol/pickle.py:59: size=41.0 KiB (+2468 B), count=360 (+21), average=117 B
  File "[..]/python3.6/site-packages/distributed/protocol/pickle.py", line 59
    return pickle.loads(x)
  File "[..]/python3.6/site-packages/distributed/worker.py", line 720
    function = pickle.loads(function)
  File "[..]/python3.6/site-packages/distributed/worker.py", line 1275
    self.tasks[key] = _deserialize(function, args, kwargs, task)
  File "[..]/python3.6/site-packages/distributed/core.py", line 375
    handler(**merge(extra, msg))
  File "[..]/python3.6/site-packages/tornado/gen.py", line 1113
    yielded = self.gen.send(value)
  File "[..]/python3.6/site-packages/tornado/gen.py", line 1199
    self.run()
  File "[..]/python3.6/site-packages/tornado/stack_context.py", line 276
    return fn(*args, **kwargs)
Axel-CH commented 6 years ago

Hello, I have the same Issue. I tried to manually perform a client.restart() but the workers are restarting with theirs saturated memories. Is it possible to reset all workers memory? (already tried client.cancel and del xx, without success). Thanks

mrocklin commented 6 years ago

but the workers are restarting with theirs saturated memories

What does this mean exactly?

Calling client.restart kills the worker process and then brings it back. It's a pretty clean state.

On Fri, Aug 31, 2018 at 2:08 AM, Axel CHERUBIN notifications@github.com wrote:

Hello, I have the same Issue. I tried to manually perform a client.restart() but the workers are restarting with theirs saturated memories. Is it possible to reset all workers memory? (already tried client.cancel and del xx, without success). Thanks

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2068#issuecomment-417562021, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszDpgTnGpezeF6yIxZ1SoqsENJeZAks5uWNLbgaJpZM4UzTGE .

Axel-CH commented 6 years ago

@mrocklin Thank you for your support.

The context

I'm working on freqtrade, a trading cryptocurrency bot library. To alow quicker price analysis I'm adding the possibility to use a maximum of cpu ressources. Here is the issue that I'm working on : issue. And the feature discussion feature

The Problem

The bot run a loop each minute to check price and do analysis. After each loop completion the workers memory rise slowly of about 1%. Then I receive this warning multiple time: distributed.utils_perf - WARNING - full garbage collections took 14% CPU time recently (threshold: 10%) Then after a few more loops: distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.60 GB -- Worker memory limit: 2.09 GB

At this point the workers start a strike, no more work is performed.

Logs, confirming that the workers are restarting due to the client.restart() at each end of loop : distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/2 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/2 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/4 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/4 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/6 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/6 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/8 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/8 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/10 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/10 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/12 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/12 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/14 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/14 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/16 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/16 distributed.scheduler - INFO - Client Client-ef7049de-ad17-11e8-a034-5f58bbbb3184 requests to cancel 0 keys distributed.scheduler - INFO - Send lost future signal to clients distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/10 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/12 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/14 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/16 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/2 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/4 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/6 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/8 distributed.scheduler - INFO - Lost all workers distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Clear task state

What I Tried

-start workers on Process mode -start workers on Thread mode -client.restart() -Pands option, pd.options.mode.chained_assignment = None

freqtrade-create-trade-with-dask.zip

Result

The issue is present despite all attempts. During the workers restart there is a deserialisation process from the worker space folder? Do you think that the memory loss can be maintained by that process? Regards, Axel

mrocklin commented 6 years ago

After a restart workers don't load anything in. They're fresh Python processes and all previous work is removed and forgotten.

Three thoughts come to mind:

  1. Your library code or something else in your environment is large and taking up space. You should try using fewer workers with more memory.
  2. You're generating and destroying tons of small numpy arrays, and running into a memory leak in Malloc/Numpy: https://github.com/dask/dask/issues/3530
  3. (most likely) something totally different and specific to your workflow

On Fri, Aug 31, 2018 at 11:30 AM, Axel CHERUBIN notifications@github.com wrote:

@mrocklin https://github.com/mrocklin Thank you for your support. The context

I'm working on freqtrade https://github.com/freqtrade/freqtrade, a trading cryptocurrency bot library. To alow quicker price analysis I'm adding the possibility to use a maximum of cpu ressources. Here is the issue that I'm working on : issue https://github.com/freqtrade/freqtrade/issues/746. And the feature discussion feature https://github.com/freqtrade/freqtrade/issues/1170 The Problem

The bot run a loop each minute to check price and do analysis. After each loop completion the workers memory rise slowly of about 1%. Then I receive this warning multiple time: distributed.utils_perf - WARNING - full garbage collections took 14% CPU time recently (threshold: 10%) Then after a few more loops: distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.60 GB -- Worker memory limit: 2.09 GB

At this point the workers start a strike, no more work is performed.

Logs, confirming that the workers are restarting due to the client.restart() at each end of loop : distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/2 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/2 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/4 distributed.scheduler - INFO - Starting worker compute stream, inproc:// 192.168.42.128/8244/4 distributed.scheduler - INFO - Register inproc:// 192.168.42.128/8244/6 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/6 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/8 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/8 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/10 distributed.scheduler - INFO - Starting worker compute stream, inproc:// 192.168.42.128/8244/10 distributed.scheduler - INFO - Register inproc:// 192.168.42.128/8244/12 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/12 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/14 distributed.scheduler - INFO - Starting worker compute stream, inproc://192.168.42.128/8244/14 distributed.scheduler - INFO - Register inproc://192.168.42.128/8244/16 distributed.scheduler - INFO - Starting worker compute stream, inproc:// 192.168.42.128/8244/16 distributed.scheduler - INFO - Client Client-ef7049de-ad17-11e8-a034-5f58bbbb3184 requests to cancel 0 keys distributed.scheduler - INFO - Send lost future signal to clients distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/ 10 distributed.scheduler - INFO - Remove worker inproc:// 192.168.42.128/8244/12 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/14 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/16 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/2 distributed.scheduler - INFO

  • Remove worker inproc://192.168.42.128/8244/4 distributed.scheduler - INFO - Remove worker inproc://192.168.42.128/8244/6 distributed.scheduler
  • INFO - Remove worker inproc://192.168.42.128/8244/8 distributed.scheduler - INFO - Lost all workers distributed.scheduler - INFO - Clear task state distributed.scheduler - INFO - Clear task state What I Tried

-start workers of Process mode -start workers on Thread mode -client.restart() -Pands option, pd.options.mode.chained_assignment = None

freqtrade-create-trade-with-dask.zip https://github.com/dask/distributed/files/2340623/freqtrade-create-trade-with-dask.zip Result

The issue is present despite all attempts. During the workers restart there is a deserialisation process from the worker space folder? Do you think that the memory loss can be maintained by that process? Regards, Axel

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2068#issuecomment-417700974, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszOImIG3441MpJxqMEyGPIezQ33ZRks5uWValgaJpZM4UzTGE .

Axel-CH commented 6 years ago

Ok I will try to use fewer workers, just for information i'm running dask distributed 1.23.0. With python 3.7 on a Debian VMware virtual machine.

Axel-CH commented 6 years ago

@mrocklin as you suggested I tried to run the code with less workers to have more memory for each one. The behaviour is the same, I just have more time before the memory limit is reached.

I've noted that there is a mismatch between the used memory by worker showed by Dask dashboard and the total memory used showed by Windows Or Linux.

Dask Dashboard = 2Go by worker = 2*16 = 32Go used Windows Ram = 10.4Go used (16Go total)

2018-08-31 19_09_04-dask_ workers windows_memory_usage

Do you think that there is a possibility that Dask perform a wrong memory usage calculation then freeze the workers despite there is memory available?

mrocklin commented 6 years ago

That number uses psutil.Process().memory_info().rss, which looks at OS level metrics (assuming you're using a somewhat recent version of Dask). I'm surprised to hear that they're different.

On Fri, Aug 31, 2018 at 7:05 PM, Axel CHERUBIN notifications@github.com wrote:

@mrocklin https://github.com/mrocklin as you suggested I tried to run the code with less workers with to have more memory for each one. The behaviour is the same, I just havec more time before the memory limit is reached.

I've note that there is a mismath between the used memory by worker showed by Dask dashboard and the total memory used showed by Windows Or Linux.

Dask Dashboard = 2Go by worker = 2*16 = 32Go used Windows Ram = 10.4Go used (16Go total)

Do you think that there is a possibility that Dask perform a wrong memory usage calculation then freeze the workers despite there is memory available?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2068#issuecomment-417810289, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszDIt1vWVCG2mJ9Dssx3UWbqtsMZOks5uWcE2gaJpZM4UzTGE .

Axel-CH commented 6 years ago

I'm using the version 1.23.0 that you pushed yesterday, maybe that function send to dask wrong data? But it does not seem really likely. Maybe that problem is comming from the way dask store, exploit and refresh this information.

abast commented 5 years ago

@Axel-CH I've also noticed a mismatch between the memory usage reported by dask distributed and the OS. What helped me to resolve problems of freezed and killed workers was to change the configuration described here to the following:

     # Fractions of worker memory at which we take action to avoid memory blowup
     # Set any of the lower three values to False to turn off the behavior entirely
     memory:
       target: 0.95  # target fraction to stay below
       spill: False  # fraction at which we spill to disk
       pause: False  # fraction at which we pause worker threads
       terminate: False  # fraction at which we terminate the worker
Axel-CH commented 5 years ago

Thanks you for your feedback @abast, I really appreciate it. I will test what you advised, we'll see if it's working in my case. rgds

Athlete369 commented 5 years ago

Have anyone found out the solution of this problem

TomAugspurger commented 5 years ago

@Athlete369 it's not clear that there's a single problem here, which adds to the confusion.

See https://github.com/dask/distributed/issues/2757, https://github.com/dask/dask/issues/3530, among others.

knowsuchagency commented 5 years ago

Interestingly, I get similar results to @mrocklin when doing concurrent POST requests when using partial functions submitted to a threaded local client i.e.

    client = Client(processes=False)
    tasks = [client.submit(functools.partial(requests.post, URL, json=MESSAGE)) for _ in range(MSG_COUNT)]
    list(tqdm(as_completed(tasks), total=len(tasks)))

leads to

distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 13% CPU time recently (threshold: 10%)
...

HOWEVER simply passing my arguments to the submit function without wrapping them in a partial solves the issue for me i.e.

tasks = [client.submit(requests.post, URL, json=MESSAGE) for _ in range(MSG_COUNT)]
knowsuchagency commented 5 years ago

The issue also goes away if I reduce the size of the payload. Seems like an issue with the size of what's being pickled as suggested by @iljau's investigation

songqiqqq commented 5 years ago

Any solution or walk-around for this problem? It's really urgent for me now.

TomAugspurger commented 5 years ago

No updates from me.

songqiqqq commented 5 years ago

Any solution or walk-around for this problem? It's really urgent for me now.

sorry , I find it's not memory leak problem in my case. Actually , it seems (personal opinion) the problem of bad control of graph size (I mean number of tasks). If i control the number of tasks submited, the memory is almost at a constant level. Thanks .

knowsuchagency commented 5 years ago

@songqiqqq's suggestion seems to be a viable workaround -- limiting the number of tasks scheduled at any given time.

Changing

MSG_COUNT = 1_000_000

with Client(processes=False) as client:
    pbar = tqdm(total=MSG_COUNT)
    tasks = [client.submit(functools.partial(requests.post, URL, json=MESSAGE)) for _ in range(MSG_COUNT)]
    for _ in as_completed(tasks):
        pbar.update(1)

to

MSG_COUNT = 1_000_000

def group(iterable, chunk_size=50_000):
    iterable = iter(iterable)
    chunk = list(it.islice(iterable, chunk_size))
    while chunk:
        yield chunk
        chunk = list(it.islice(iterable, chunk_size))

with Client(processes=False) as client:
    pbar = tqdm(total=MSG_COUNT)
    for chunk in group(range(MSG_COUNT)):
        tasks = [client.submit(functools.partial(requests.post, URL, json=MESSAGE)) for _ in chunk]
        for _ in as_completed(tasks):
            pbar.update(1)

solved the issue for me.

I continued to get warnings; but tasks were processed

ahirner commented 3 years ago

FWIW, by clearing lru_caches regularly, end up with 103 instead of 111MB on 3.6.8 and 2020.12.0.

def clear_lrus():
    import functools
    import gc

    gc.collect()
    wrappers = [a for a in gc.get_objects() if
        isinstance(a, functools._lru_cache_wrapper)]

    for wrapper in wrappers:
        wrapper.cache_clear()

def main_distributed_client():
    from distributed import Client
    client = Client(processes=True, n_workers=1, threads_per_worker=1)

    for i in range(1000):
        delayed_obj = Combiner().delayed_results()
        future = client.compute(delayed_obj)
        res = future.result()
        print("#%d, mem: %s mb" % (i, res['mem_usage_mb']))

        collect_res = client.run(lambda: gc.collect()) # doesn't help
        collect_res = client.run(clear_lrus)

In particular, I'm eying on key_split since I'm not certain which kind of objects might end up there. The rest is maybe within worker logs?

Brontomerus commented 3 years ago

I wanted to add some unexpected results I observed and was able to resolve thanks to the dialogue above...

Using Python 3.8.5 Prefect==0.14.16 Dask[complete]==2021.2.0

I was observing errors while trying to run my Prefect workflows on an AWS Stack involving some ECS Fargate containers, which kept saying something along the lines of this:

 Unexpected error: TypeError('string indices must be integers') 

This was strange, and I definitely had a few other lingering problems that really made my hunt difficult until I saw some logs that stated the following:

distributed.core - INFO - Event loop was unresponsive in Worker for 3.69s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

distributed.utils_perf - INFO - full garbage collection released 45.16 MB from 37 reference cycles (threshold: 10.00 MB)

Above, @mrocklin suggested "less workers and more memory per worker", which proved to be my silver bullet in this instance. Hope someone else sees that issue and can react accordingly!