Closed bbengfort closed 7 years ago
The solution is to add a callback to the pool_async method rather than iterating through all the results objects (which will be ordered and block on the task.get() call).
By having the callback be a method on the command, the command can track information such as the time delta and write the data in a synchronous manner* to the file on disk (I hope).
import time
import random
import multiprocessing as mp
def foo(idx):
sleep = random.randint(2,10)
time.sleep(sleep)
return "call {} slept for {} seconds".format(idx, sleep)
def handle_result(result):
print result
if __name__ == '__main__':
pool = mp.Pool(8)
tasks = [
pool.apply_async(foo, (i+1,), callback=handle_result)
for i in xrange(8)
]
pool.close()
pool.join()
Specified in this Stack Overflow: http://stackoverflow.com/questions/8533318/python-multiprocessing-pool-when-to-use-apply-apply-async-or-map
Also the SO that answers who runs the callback (but threads?!) is here:
Specifically:
Yes, the callbacks will be executed sequentially. The
_result_handler
thread pulls one completed task off the queue, calls_set
(which runs the callback), then moves on to the next one. This is why the documentation says to make sure the callback completes immediately; executing the callback blocks other results from being processed.
Ok, so after trying to deal with memory profiling issues for the past two days - the magic realization is that
pool_async
blocks until all results are collected (in memory). Thus me trying to dump results as they came in and get rid of the memory really wasn't working.The new results writing scheme is a result per line, not a json object of results. This will hopefully make iterative parsing of the results files easier w/o the need to store each result in its own file.