dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Client-side memory leak in map_partitions + persist #1378

Open bluenote10 opened 7 years ago

bluenote10 commented 7 years ago

I'm fighting some out-of-memory issues on client side in an operation, which acquires (and probably leaks) much more client memory than expected. The problem occurs in a combination of map_partitions and persist. Complete reproducing example:

from __future__ import division, print_function
import pandas as pd
import numpy as np
import os
import gc
import psutil

import dask.dataframe as dd
from dask import delayed
from dask.distributed import Client

LAST_MEM_USAGE = 0

def log_memory(label):
    global LAST_MEM_USAGE
    gc.collect()
    process = psutil.Process(os.getpid())
    mem_usage = process.memory_info().rss / float(2 ** 20)
    mem_usage_diff = mem_usage - LAST_MEM_USAGE
    print(
        "[MemUsage] {:8.1f} MB    [{:+8.1f} MB] @ {}".format(
            mem_usage, mem_usage_diff, label
        )
    )
    LAST_MEM_USAGE = mem_usage

def run_reproducing_example(dask_endpoint):
    client = Client(dask_endpoint)
    log_memory("Start")

    vector = dd.from_delayed([
        delayed(lambda: pd.Series([1]*10000000)) for _ in xrange(5)
    ], meta=pd.Series(0))
    log_memory("Initializing dask.Series")

    matrices = [
        np.random.uniform(-1, +1, size=(1000, 1000)) for _ in range(50)
    ]
    log_memory("Initializing matrices")

    for partition_id, matrix in enumerate(matrices):

        def identity_mapper(vector, matrix):
            return vector

        vector = dd.map_partitions(
            identity_mapper, vector, matrix
        ).persist()
        log_memory("After iteration: {}".format(partition_id + 1))

    log_memory("End")

# run example on remote cluster to isolate client memory behavior
run_reproducing_example("<YOUR_DASK_ENDPOINT>")

Output:

[MemUsage]     72.3 MB    [   +72.3 MB] @ Start
[MemUsage]     72.4 MB    [    +0.1 MB] @ Initializing dask.Series
[MemUsage]    454.1 MB    [  +381.7 MB] @ Initializing matrices
[MemUsage]    516.4 MB    [   +62.4 MB] @ After iteration: 1
[MemUsage]    585.9 MB    [   +69.4 MB] @ After iteration: 2
[MemUsage]    624.0 MB    [   +38.2 MB] @ After iteration: 3
[MemUsage]    662.2 MB    [   +38.2 MB] @ After iteration: 4
[MemUsage]    700.3 MB    [   +38.2 MB] @ After iteration: 5
[MemUsage]    738.5 MB    [   +38.1 MB] @ After iteration: 6
[MemUsage]    776.6 MB    [   +38.2 MB] @ After iteration: 7
[MemUsage]    814.8 MB    [   +38.2 MB] @ After iteration: 8
[MemUsage]    837.6 MB    [   +22.8 MB] @ After iteration: 9
[MemUsage]    875.7 MB    [   +38.2 MB] @ After iteration: 10
[MemUsage]    913.9 MB    [   +38.2 MB] @ After iteration: 11
[MemUsage]    943.9 MB    [   +30.1 MB] @ After iteration: 12
[MemUsage]    982.1 MB    [   +38.2 MB] @ After iteration: 13
[MemUsage]   1015.0 MB    [   +32.9 MB] @ After iteration: 14
[MemUsage]   1028.8 MB    [   +13.8 MB] @ After iteration: 15
[MemUsage]   1098.3 MB    [   +69.6 MB] @ After iteration: 16
[MemUsage]   1136.5 MB    [   +38.2 MB] @ After iteration: 17
[MemUsage]   1174.6 MB    [   +38.2 MB] @ After iteration: 18
[MemUsage]   1212.8 MB    [   +38.2 MB] @ After iteration: 19
[MemUsage]   1250.9 MB    [   +38.1 MB] @ After iteration: 20
[MemUsage]   1289.1 MB    [   +38.2 MB] @ After iteration: 21
[MemUsage]   1307.3 MB    [   +18.2 MB] @ After iteration: 22
[MemUsage]   1345.5 MB    [   +38.2 MB] @ After iteration: 23
[MemUsage]   1383.6 MB    [   +38.2 MB] @ After iteration: 24
[MemUsage]   1421.8 MB    [   +38.2 MB] @ After iteration: 25
[MemUsage]   1450.2 MB    [   +28.5 MB] @ After iteration: 26
[MemUsage]   1483.4 MB    [   +33.2 MB] @ After iteration: 27
[MemUsage]   1519.2 MB    [   +35.7 MB] @ After iteration: 28
[MemUsage]   1578.0 MB    [   +58.8 MB] @ After iteration: 29
[MemUsage]   1623.9 MB    [   +45.9 MB] @ After iteration: 30
[MemUsage]   1662.1 MB    [   +38.2 MB] @ After iteration: 31
[MemUsage]   1700.3 MB    [   +38.2 MB] @ After iteration: 32
[MemUsage]   1738.4 MB    [   +38.1 MB] @ After iteration: 33
[MemUsage]   1776.5 MB    [   +38.2 MB] @ After iteration: 34
[MemUsage]   1799.3 MB    [   +22.8 MB] @ After iteration: 35
[MemUsage]   1837.5 MB    [   +38.2 MB] @ After iteration: 36
[MemUsage]   1875.7 MB    [   +38.2 MB] @ After iteration: 37
[MemUsage]   1906.2 MB    [   +30.5 MB] @ After iteration: 38
[MemUsage]   1944.3 MB    [   +38.2 MB] @ After iteration: 39
[MemUsage]   1976.8 MB    [   +32.4 MB] @ After iteration: 40
[MemUsage]   2020.8 MB    [   +44.0 MB] @ After iteration: 41
[MemUsage]   2081.8 MB    [   +61.1 MB] @ After iteration: 42
[MemUsage]   2120.0 MB    [   +38.2 MB] @ After iteration: 43
[MemUsage]   2158.2 MB    [   +38.2 MB] @ After iteration: 44
[MemUsage]   2196.4 MB    [   +38.2 MB] @ After iteration: 45
[MemUsage]   2219.2 MB    [   +22.9 MB] @ After iteration: 46
[MemUsage]   2257.4 MB    [   +38.2 MB] @ After iteration: 47
[MemUsage]   2295.6 MB    [   +38.2 MB] @ After iteration: 48
[MemUsage]   2326.0 MB    [   +30.5 MB] @ After iteration: 49
[MemUsage]   2359.9 MB    [   +33.9 MB] @ After iteration: 50

A guess would be that this combination of map_partitions + persist makes copies of the matrix parameters and there are unreleased references to these copies. Each iteration even seems to hold on to several copies, because each individual matrix is just ~7.6 MB in size, but the amount of leaked memory is ~5 times larger. In general I would not expect the client to keep references to these parameter matrices at all (but I assume that they are stored in the scheduler). Using a single persist after the loop without persisting in the loop seems to leak memory in a similar way.

Versions:

mrocklin commented 7 years ago

Thanks for the nice example.

I put a sleep(0.5) in the for loop and now get the following:

[MemUsage]    135.9 MB    [  +135.9 MB] @ Start
[MemUsage]    136.2 MB    [    +0.3 MB] @ Initializing dask.Series
[MemUsage]    518.0 MB    [  +381.8 MB] @ Initializing matrices
[MemUsage]    571.7 MB    [   +53.7 MB] @ After iteration: 1
[MemUsage]    577.5 MB    [    +5.8 MB] @ After iteration: 2
[MemUsage]    577.5 MB    [    +0.0 MB] @ After iteration: 3
[MemUsage]    600.2 MB    [   +22.7 MB] @ After iteration: 4
[MemUsage]    600.2 MB    [    -0.1 MB] @ After iteration: 5
[MemUsage]    600.1 MB    [    -0.1 MB] @ After iteration: 6
[MemUsage]    600.3 MB    [    +0.2 MB] @ After iteration: 7
[MemUsage]    609.6 MB    [    +9.3 MB] @ After iteration: 8
[MemUsage]    609.6 MB    [    -0.0 MB] @ After iteration: 9
[MemUsage]    609.5 MB    [    -0.1 MB] @ After iteration: 10
[MemUsage]    609.7 MB    [    +0.2 MB] @ After iteration: 11
[MemUsage]    610.7 MB    [    +1.0 MB] @ After iteration: 12
[MemUsage]    610.6 MB    [    -0.0 MB] @ After iteration: 13
[MemUsage]    610.6 MB    [    -0.1 MB] @ After iteration: 14
[MemUsage]    610.8 MB    [    +0.2 MB] @ After iteration: 15
[MemUsage]    610.7 MB    [    -0.1 MB] @ After iteration: 16
[MemUsage]    610.7 MB    [    -0.0 MB] @ After iteration: 17
[MemUsage]    610.6 MB    [    -0.1 MB] @ After iteration: 18
[MemUsage]    610.8 MB    [    +0.2 MB] @ After iteration: 19
[MemUsage]    610.7 MB    [    -0.1 MB] @ After iteration: 20
[MemUsage]    610.7 MB    [    -0.0 MB] @ After iteration: 21
[MemUsage]    610.7 MB    [    -0.0 MB] @ After iteration: 22
[MemUsage]    610.6 MB    [    -0.1 MB] @ After iteration: 23
[MemUsage]    610.8 MB    [    +0.2 MB] @ After iteration: 24
[MemUsage]    610.7 MB    [    -0.1 MB] @ After iteration: 25
[MemUsage]    610.6 MB    [    -0.1 MB] @ After iteration: 26
[MemUsage]    610.8 MB    [    +0.2 MB] @ After iteration: 27
[MemUsage]    618.7 MB    [    +7.9 MB] @ After iteration: 28
[MemUsage]    618.6 MB    [    -0.1 MB] @ After iteration: 29
[MemUsage]    623.8 MB    [    +5.1 MB] @ After iteration: 30
[MemUsage]    623.7 MB    [    -0.1 MB] @ After iteration: 31
[MemUsage]    623.6 MB    [    -0.1 MB] @ After iteration: 32
[MemUsage]    623.8 MB    [    +0.2 MB] @ After iteration: 33
[MemUsage]    623.7 MB    [    -0.1 MB] @ After iteration: 34
[MemUsage]    623.6 MB    [    -0.1 MB] @ After iteration: 35
[MemUsage]    623.8 MB    [    +0.2 MB] @ After iteration: 36
[MemUsage]    623.7 MB    [    -0.1 MB] @ After iteration: 37
[MemUsage]    623.6 MB    [    -0.1 MB] @ After iteration: 38
[MemUsage]    623.6 MB    [    -0.1 MB] @ After iteration: 39
[MemUsage]    623.8 MB    [    +0.2 MB] @ After iteration: 40
[MemUsage]    623.7 MB    [    -0.1 MB] @ After iteration: 41
[MemUsage]    623.7 MB    [    -0.1 MB] @ After iteration: 42
[MemUsage]    623.6 MB    [    -0.1 MB] @ After iteration: 43
[MemUsage]    623.8 MB    [    +0.2 MB] @ After iteration: 44
[MemUsage]    623.7 MB    [    -0.1 MB] @ After iteration: 45
[MemUsage]    623.6 MB    [    -0.1 MB] @ After iteration: 46
[MemUsage]    623.8 MB    [    +0.2 MB] @ After iteration: 47
[MemUsage]    623.7 MB    [    -0.1 MB] @ After iteration: 48

So my guess is that we are writing faster than the network can keep up.

mrocklin commented 7 years ago

This process is very expensive on the scheduler though, which keeps a copy of all of the intermediate values because of the persist loop that you mention.

mrocklin commented 7 years ago

It would be useful to enforce some kind of back pressure on the user. Unfortunately all of the current submission functions are entirely non-blocking.

bluenote10 commented 7 years ago

Thanks for the quick feedback!

Indeed using wait(vector) in the loop seems to be a reasonable work-around. But I think it is not only a back pressure issue: If I place a single wait(vector) after the loop, the memory can't be clean-up any more even after the computation has finished (and there is still a reference to vector). Is this a memory leak in the buffering?

The reason I'm using this iterative approach with intermediate results instead of a single persist is that the single persist goes out-of-memory immediately. Apparently it behaves the same as the iterative version without the waits/sleeps.