Parsl / parsl

Parsl - a Python parallel scripting library
http://parsl-project.org
Apache License 2.0
503 stars 195 forks source link

Fix memory explosion when passing large-ish python objects #1187

Open annawoodard opened 5 years ago

annawoodard commented 5 years ago

It is often convenient to pass around large-ish objects to python apps. Currently, for workflows with large numbers of tasks, this causes a memory explosion for the main Parsl process. This probably has to do with keeping args/kwargs in the DFK internal dict, but it's not immediately obvious to me why-- inspecting the ids of the args in the DFK match the original one, so that's not where the copy is happening.

import sys

import numpy as np
import pandas as pd
import parsl
from parsl.configs.htex_local import config
import tqdm

dfk = parsl.load(config)

data = pd.DataFrame(np.random.rand(10000, 1000))
print('created dataset with size {:.2f} MB and id {}'.format(sys.getsizeof(data) / 1e6, id(data)))

@parsl.python_app
def foo(data):
    return True

futures = []
for i in tqdm.tqdm(range(1000)):
    futures += [foo(data)]
    args = sum([x['args'] for x in dfk.tasks.values()], [])
    print('current unique ids: {}'.format(set([id(x) for x in args])))

yields:

created dataset with size 80.00 MB and id 139770018452648
current unique ids: {139770018452648}
current unique ids: {139770018452648}
...
benclifford commented 5 years ago

Does the memory performance change using local thread executor? Htex will serialise (= copy) more than local thread executor.

annawoodard commented 5 years ago

🤦‍♀ Thanks Ben!! You're right, it's instantaneous with the local thread executor. This is definitely happening in ipyparallel.serialize.

benclifford commented 5 years ago

What is the actual parsl issue here, after that change? That parsl should make it easier to move this stuff around? That we should be clear that in the case of big objects you need to do something like serialize them yourself and use eg file staging?

annawoodard commented 5 years ago

The issue is that, if the user would rather take the performance hit of sending the same large object to all their workers (until we have something more sophisticated like worker-side caching) than dealing with doing their own serialization (and any associated filesystem issues), they should be able to do so without the main process memory blowing up.

If I serialize the large object as usual but then replace the args in the message with a fake empty args list before putting it in the outgoing queue, the memory doesn't blow up (as expected-- the refcount goes to zero when the submit block ends). I think the issue is that a ref or copy is being kept around somewhere even after the message is sent.

         big_message = pack_apply_message(func, args, kwargs,
                                     buffer_threshold=1024 * 1024,
                                     item_threshold=1024)
         fn_buf = pack_apply_message(func, [], kwargs,
                                     buffer_threshold=1024 * 1024,
                                     item_threshold=1024)

         msg = {"task_id": task_id,
                "buffer": fn_buf}

         # Post task to the the outgoing queue
         self.outgoing_q.put(msg)
annawoodard commented 5 years ago

This looks related: zeromq/pyzmq#1261