dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Memory Leak? Very big graphs clogging scheduler? #2433

Closed bluecoconut closed 1 year ago

bluecoconut commented 5 years ago

I found something that feels like a memory leak, but I'm not really sure if it's that or if it's just overhead of using the scheduler with lots of tasks (seems unlikely, but I wanted to ask). I saw there were other posts about possible memory leaks, but my method of causing feels a bit different than those described elsewhere so I wanted to post my example code to generate and ask if there is anything obvious I am messing up or if it's a known issue?

In my attempts to simply calculate lots of basic statistics on a large data-frame, I am now at a point with a small-ish example (runs on my local machine) that demonstrates a failure in dask that doesn't happen in pandas on the same underlying code. (Of specific note is that I am setting the npartitions=100 to reproduce the approximate number of tasks that I am getting on larger machines / clusters I am trying to do similar work on.)

This is the smaller reproducible example of a problem I was facing on 4 TB memory setup, where 150 GB tables (approx) would be reported via dask to only be using 500 GB in memory during processing steps, but the machine would lock up and actually be using the full amount of RAM. For the larger machine I was starting the client via dask-worker and dask-scheduler CLI commands rather than through the client = Client() setup.

Code Setup

## SETUP

run_with_dask = True

# Create some dummy data.
import numpy as np
import pandas as pd

data = {'col_{}'.format(i): np.random.uniform(size=500000) for i in range(500)}
pd_df = pd.DataFrame(data)
del data

if run_with_dask:
    import dask.dataframe as dd
    from distributed import Client
    client = Client()
    print(client)
    df = dd.from_pandas(pd_df, npartitions=100)
    df = client.persist(df)
else:
    df = pd_df

del pd_df

## Statistic Generation

def generate_filter(n_cols=500):
    return {'col': 'col_{}'.format(np.random.choice(n_cols)),
            'op': np.random.choice(['lt', 'gt']),
            'value': np.random.uniform(0.25, 0.75)}

def get_n_filter(n):
    return [generate_filter() for x in range(n)]

def get_statistic_batch(filters, n, n_cols=500):
    return [{'filters': filters, 
             'output': {'col': 'col_{}'.format(np.random.choice(n_cols)),
                        'op': np.random.choice(['avg', 'sum', 'max', 'min', 'stddev', 'unique_count'])
                       }
            } for x in range(n)]

def random_stats(chunks, n_per_chunk):
    stats = []
    for i in range(chunks):
        filt_n = np.random.randint(10)+1
        stats.extend(get_statistic_batch(get_n_filter(filt_n), n_per_chunk))
    return stats

## Statistic Parsing

def filter_index(df, filter):
    filter_ops = {'lt': lambda x, y: x < y, 'gt': lambda x, y: x > y, 'eq': lambda x, y: x == y}
    return filter_ops[filter['op']](df[filter['col']], filter['value'])

def get_indexer(df, filters):
    if len(filters) == 1:
        return filter_index(df, filters[0])
    return np.logical_and(filter_index(df, filters[0]), get_indexer(df, filters[1:]))

def get_statistic(df, statistic):
    indexer = get_indexer(df, statistic['filters'])
    agg_ops = {'sum': np.sum, 'avg': np.mean, 'max': np.max, 'min': np.min, 'stddev': np.std, 'unique_count': lambda x: x.unique().size}
    return agg_ops[statistic['output']['op']](df[statistic['output']['col']][indexer])

Benchmarking

import time
np.random.seed(137)

st = time.time()
single_stat = get_statistic(df, random_stats(1, 1)[0])
ft = time.time()
print("Single stat graph generation (or calculation with pandas): ", ft-st)
if run_with_dask:
    print(len(single_stat.dask))
    st = time.time()
    final_number = single_stat.compute()
    ft = time.time()
    print("Single stat: compute via dask", ft-st)

st = time.time()
stats_5_100 = random_stats(5, 100)
batch_5_100 = [get_statistic(df, x) for x in stats_5_100]
ft = time.time()
print("500 stat generation (or calc with pandas). Lots of repeated filters.", ft-st)
if run_with_dask:
    st = time.time()
    final_results = client.compute(batch_5_100, sync=True)
    ft = time.time()
    print("500 stat generation, compute via dask.", ft-st)

st = time.time()
stats_100_5 = random_stats(100, 5)
batch_100_5 = [get_statistic(df, x) for x in stats_100_5]
ft = time.time()
print("500 stat generation (or calc with pandas). Few repeated filters",ft-st)
if run_with_dask:
    st = time.time()
    final_results = client.compute(batch_100_5, sync=True)
    ft = time.time()
    print("500 stat generation, compute via dask.", ft-st)

st = time.time()
stats_50_100 = random_stats(50, 100)
batch_50_100 = [get_statistic(df, x) for x in stats_50_100]
ft = time.time()
print("5000 stat generation.", ft-st)
if run_with_dask:
    st = time.time()
    final_results = client.compute(batch_50_100, sync=True)
    ft = time.time()
    print("5000 stat generation, compute via dask", ft-st)

Results

Running with run_with_dask = False Memory use (via htop) stays below 3 GB on my machine.

Single stat graph generation (or calculation with pandas):  0.00588536262512207
500 stat generation (or calc with pandas). Lots of repeated filters. 1.989380121231079
500 stat generation (or calc with pandas). Few repeated filters 1.928800106048584
5000 stat generation. 17.788148641586304

Running with run_with_dask = True, Memory use via htop fills up my machine (>15.6 GB) and starts swapping until everything dies. Also, watching the diagnostic dashboard shows 5.9 GB of in use by the cluster, but htop shows a lot more use.

Stopping before the final batch even, after just running the two 500 stat generation batches, shows a mismatch between reported Mem via dask and actual Mem via htop.

Extra note about seeing this problem even more viscerally: by editing data = {'col_{}'.format(i): np.random.uniform(size=500000) for i in range(500)} to only create 5000 rows of data (size=5000), the memory use still balloons to >13 GB in dask, while the scheduler dashboard reports 2 GB.

Output from run_with_dask = True on this small dataframe:

Single stat graph generation (or calculation with pandas):  0.010093927383422852
2418
Single stat: compute via dask 1.0854308605194092
500 stat generation (or calc with pandas). Lots of repeated filters. 3.176414728164673
500 stat generation, compute via dask. 58.63161826133728
500 stat generation (or calc with pandas). Few repeated filters 3.3641154766082764
500 stat generation, compute via dask. 97.8330614566803
5000 stat generation. 31.91444993019104
distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50122 remote=tcp://127.0.0.1:40591>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50124 remote=tcp://127.0.0.1:40591>
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50126 remote=tcp://127.0.0.1:40591>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50128 remote=tcp://127.0.0.1:40591>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50130 remote=tcp://127.0.0.1:40591>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50132 remote=tcp://127.0.0.1:40591>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50134 remote=tcp://127.0.0.1:40591>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50136 remote=tcp://127.0.0.1:40591>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50138 remote=tcp://127.0.0.1:40591>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://127.0.0.1:50142 remote=tcp://127.0.0.1:40591>
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)

Environment

Python 3.7.1
dask==1.0.0
distributed==1.25.1
msgpack==0.5.6

My general problem as well: https://stackoverflow.com/questions/53844188/how-do-i-use-dask-to-efficiently-calculate-many-simple-statistics

mrocklin commented 5 years ago

I recommend trying the threaded scheduler to see if the problem persists. I also recommend trying the single-threaded scheduler to see if the problem persists. If the answer is "yes" then "no, then my first guess is https://github.com/dask/dask/issues/3530 , which goes down to a glibc problem. If you feel adventurous and want to try some of the solutions listed in that issue to see if they help that would be interesting.

On Thu, Dec 20, 2018 at 1:12 AM Justin Waugh notifications@github.com wrote:

I found something that feels like a memory leak, but I'm not really sure if it's that or if it's just overhead of using the scheduler with lots of tasks (seems unlikely, but I wanted to ask). I saw there were other posts about possible memory leaks, but my method of causing feels a bit different than those described elsewhere so I wanted to post my example code to generate and ask if there is anything obvious I am messing up or if it's a known issue?

In my attempts to simply calculate lots of basic statistics on a large data-frame, I am now at a point with a small-ish example (runs on my local machine) that demonstrates a failure in dask that doesn't happen in pandas on the same underlying code. (Of specific note is that I am setting the npartitions=100 to reproduce the approximate number of tasks that I am getting on larger machines / clusters I am trying to do similar work on.)

This is the smaller reproducible example of a problem I was facing on 4 TB memory setup, where 150 GB tables (approx) would be reported via dask to only be using 500 GB in memory during processing steps, but the machine would lock up and actually be using the full amount of RAM. For the larger machine I was starting the client via dask-worker and dask-scheduler CLI commands rather than through the client = Client() setup. Code Setup

SETUP

run_with_dask = True

Create some dummy data.

import numpy as np import pandas as pd

data = {'col_{}'.format(i): np.random.uniform(size=500000) for i in range(500)} pd_df = pd.DataFrame(data) del data

if run_with_dask: import dask.dataframe as dd from distributed import Client client = Client() print(client) df = dd.from_pandas(pd_df, npartitions=100) df = client.persist(df) else: df = pd_df

del pd_df

Statistic Generation

def generate_filter(ncols=500): return {'col': 'col{}'.format(np.random.choice(n_cols)), 'op': np.random.choice(['lt', 'gt']), 'value': np.random.uniform(0.25, 0.75)}

def get_n_filter(n): return [generate_filter() for x in range(n)]

def get_statistic_batch(filters, n, ncols=500): return [{'filters': filters, 'output': {'col': 'col{}'.format(np.random.choice(n_cols)), 'op': np.random.choice(['avg', 'sum', 'max', 'min', 'stddev', 'unique_count']) } } for x in range(n)]

def random_stats(chunks, n_per_chunk): stats = [] for i in range(chunks): filt_n = np.random.randint(10)+1 stats.extend(get_statistic_batch(get_n_filter(filt_n), n_per_chunk)) return stats

Statistic Parsing

def filter_index(df, filter): filter_ops = {'lt': lambda x, y: x < y, 'gt': lambda x, y: x > y, 'eq': lambda x, y: x == y} return filter_ops[filter['op']](df[filter['col']], filter['value'])

def get_indexer(df, filters): if len(filters) == 1: return filter_index(df, filters[0]) return np.logical_and(filter_index(df, filters[0]), get_indexer(df, filters[1:]))

def get_statistic(df, statistic): indexer = get_indexer(df, statistic['filters']) agg_ops = {'sum': np.sum, 'avg': np.mean, 'max': np.max, 'min': np.min, 'stddev': np.std, 'unique_count': lambda x: x.unique().size} return agg_opsstatistic['output']['op']

Benchmarking

import time np.random.seed(137)

st = time.time() single_stat = get_statistic(df, random_stats(1, 1)[0]) ft = time.time() print("Single stat graph generation (or calculation with pandas): ", ft-st) if run_with_dask: print(len(single_stat.dask)) st = time.time() final_number = single_stat.compute() ft = time.time() print("Single stat: compute via dask", ft-st)

st = time.time() stats_5_100 = random_stats(5, 100) batch_5_100 = [get_statistic(df, x) for x in stats_5_100] ft = time.time() print("500 stat generation (or calc with pandas). Lots of repeated filters.", ft-st) if run_with_dask: st = time.time() final_results = client.compute(batch_5_100, sync=True) ft = time.time() print("500 stat generation, compute via dask.", ft-st)

st = time.time() stats_100_5 = random_stats(100, 5) batch_100_5 = [get_statistic(df, x) for x in stats_100_5] ft = time.time() print("500 stat generation (or calc with pandas). Few repeated filters",ft-st) if run_with_dask: st = time.time() final_results = client.compute(batch_100_5, sync=True) ft = time.time() print("500 stat generation, compute via dask.", ft-st)

st = time.time() stats_50_100 = random_stats(50, 100) batch_50_100 = [get_statistic(df, x) for x in stats_50_100] ft = time.time() print("5000 stat generation.", ft-st) if run_with_dask: st = time.time() final_results = client.compute(batch_50_100, sync=True) ft = time.time() print("5000 stat generation, compute via dask", ft-st)

Results

Running with run_with_dask = False Memory use (via htop) stays below 3 GB on my machine.

Single stat graph generation (or calculation with pandas): 0.00588536262512207 500 stat generation (or calc with pandas). Lots of repeated filters. 1.989380121231079 500 stat generation (or calc with pandas). Few repeated filters 1.928800106048584 5000 stat generation. 17.788148641586304

Running with run_with_dask = True, Memory use via htop fills up my machine (>15.6 GB) and starts swapping until everything dies. Also, watching the diagnostic dashboard shows 5.9 GB of in use by the cluster, but htop shows a lot more use.

Stopping before the final batch even, after just running the two 500 stat generation batches, shows a mismatch between reported Mem via dask and actual Mem via htop.

Extra note about seeing this problem even more viscerally: by editing data = {'col_{}'.format(i): np.random.uniform(size=500000) for i in range(500)} to only create 5000 rows of data (size=5000), the memory use still balloons to >13 GB in dask, while the scheduler dashboard reports 2 GB.

Output from run_with_dask = True on this small dataframe:

Single stat graph generation (or calculation with pandas): 0.010093927383422852 2418 Single stat: compute via dask 1.0854308605194092 500 stat generation (or calc with pandas). Lots of repeated filters. 3.176414728164673 500 stat generation, compute via dask. 58.63161826133728 500 stat generation (or calc with pandas). Few repeated filters 3.3641154766082764 500 stat generation, compute via dask. 97.8330614566803 5000 stat generation. 31.91444993019104 distributed.utils_perf - WARNING - full garbage collections took 12% CPU time recently (threshold: 10%) distributed.comm.tcp - WARNING - Closing dangling stream in distributed.comm.tcp - WARNING - Closing dangling stream in distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%) distributed.comm.tcp - WARNING - Closing dangling stream in distributed.comm.tcp - WARNING - Closing dangling stream in distributed.comm.tcp - WARNING - Closing dangling stream in distributed.comm.tcp - WARNING - Closing dangling stream in distributed.comm.tcp - WARNING - Closing dangling stream in distributed.comm.tcp - WARNING - Closing dangling stream in distributed.comm.tcp - WARNING - Closing dangling stream in distributed.comm.tcp - WARNING - Closing dangling stream in distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)

Environment

Python 3.7.1 dask==1.0.0 distributed==1.25.1 msgpack==0.5.6

My general problem as well: https://stackoverflow.com/questions/53844188/how-do-i-use-dask-to-efficiently-calculate-many-simple-statistics

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2433, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszPLSPEbuN1QDicjCppLKQtVmz1b6ks5u6ypPgaJpZM4Zbkou .

bluecoconut commented 5 years ago

Threaded scheduler and single-threaded scheduler (dask.config.set(scheduler='threads') and dask.config.set(scheduler='synchronous')) both seem to behave the same. Memory still grows, but only during the dask graph creation, and stays constant during the execution of the graph. It seems as if this is just how large the collection.dask graph is.

In the distributed client, the memory use grows and stays large as the tasks are submitted to the cluster. My guess now is that this is just the graph in memory in the scheduler?

I tried deleting the objects in the local thread and running gc.collect() and it sometimes frees memory, but not all of it, but this is less of an issue. I'm starting to appreciate more how large these graphs can get themselves, and so going to look at reducing nodes in graph.

As an approach to addressing the problem as a whole, I am going to try out https://github.com/dask/dask/pull/4229 and see if HighLevelGraph helps / works for this task.

mrocklin commented 5 years ago

OK, if things are still filling out memory with the single threaded scheduler then that's a sign that this problem is probably pretty mundane (hooray!). My next suggestion would be to dive into the task graph and see what's in there that might be taking up space. I would also be interested in profile information on creating the task graph (time and bytes are often correlated). Figuring out what's taking time would probably help to identify what's going on. I would probably use the cProfile module and snakeviz myself here, but there are many good options.

I'm happy to help with this (it's important work) but may not get to it for a while (my TODO list has been growing quickly recently). If you're comfortable still pushing on this I'm happy to keep pointing you in good directions.

bluecoconut commented 5 years ago

I've spent some time looking at this, and as far as i can tell, this is not actually a memory leak. This is just the case where dask graphs themselves grow to be very large, and this has a lot of overhead when going to the scheduler and then to workers. I spent a while chasing and double-checking that every node generated in the graphs made sense, and after a lot of digging I can say that it seems like it is correct for the operations I am doing (filtering and aggregations on pandas frames).

This is a bit limiting for me in the long-term adoption (moving to smaller, and many, workers), but I am able to "work around" for now by using only 4-8 workers for the types of jobs I'm creating (therefore only making ~1 million node graphs, rather than the 100 million+) seems to be the way to go for now, giving me a bit of parallelism to speed up the previous pure pandas code, but not as much as I'd like. The optimization I did is roughly matching the graph creation and submission time to match the compute time, but for multi hour jobs, it feels a bit painful to see a cluster sit idle waiting for half the time. I tried to submit pieces of the jobs to the cluster via the client.compute() and then calculate more graph, and submit that, etc., but ran into some odd issues where submitting more tasks to the graph on the scheduler while a large graph was being executed caused errors, sockets to disconnect, and references to be lost (and this unfortunately wasn't always reproducible, so it's hard to track down or file an issue for).

Overall, any overhead reduction in the scheduler or graph building will likely be very valuable to me, allowing immediate speedups through scaling worker number.

songqiqqq commented 5 years ago

same problem here. It's valuable to get some advice/tricks/best-practice as to how to decrease the graph size with the same number of tasks or how to find where is the superfluouse memory used.

TomAugspurger commented 5 years ago

@songqiqqq have you seen https://docs.dask.org/en/latest/optimize.html? Task fusion is one way to get a smaller graph.

songqiqqq commented 5 years ago

@TomAugspurger Thanks for your reply. I checked that optimize page and think it should be much helpful for reducing graph size. However, my project is written by the api of "client.submit" and the graph is dynamicly adjusted (with as_completed command to realize some optimizing algorithm). I have no idea how to apply fuse-like api in dask.optimization module in my case. Do you have suggestions about this ?

However, with some expeiments, I found some tricks to reduce total memory used: 1) start the client/child processes before loading large variable at parent processes. This could avoid redundant copies in child processes, especially for large variable. 2) make the function definition clean. First, avoid directly referencing large varialbe at the outer scope. Second, moving all the function definition into a individual light py file(seperate from the environment of large variable loading/generation) seems helpful to the memory comsumption. (I don't konw the mechanism, but it works for me) 3) moniter the task states with the command "client.cluster.scheduler.tasks" and control the task stream according to that.

With the steps listed above, the memory consumption is largely reduced and problem is almost sovled in my case.

TomAugspurger commented 5 years ago

Do you have suggestions about this ?

No, I'm not really familiar with the details.