celery / billiard

Multiprocessing Pool Extensions
Other
419 stars 252 forks source link

Manager().Pool gets stuck with billiard, works with multiprocessing #340

Closed mandarup closed 3 years ago

mandarup commented 3 years ago

Functional example below works with multiprocessing but fails (gets stuck at pool.starmap) with billiard. Is there anything wrong with this example, does it need to be adapted to billiard somehow?

billiard==3.6.4.0 python3.9.0 MacOS Catalina


# import multiprocessing as mproc
import billiard as mproc

import os
import functools
import numpy as np
import random
import logging
import time

logger = logging.getLogger(__name__)

def child_proc(lock, task_id, shared_best_result):
   """A simple process that conditionally updates global shared variable
   """
    #time.sleep(1)

    # or dummy calc to stress cpu worker
    dummy_calc = np.random.rand(10000, 10000) * np.random.rand(10000, 10000)
    rnd = random.randint(1, 1000)
    print(task_id, rnd)
    with lock:
        if rnd < shared_best_result['best']:
            shared_best_result['best'] = rnd
            shared_best_result['task_id'] = task_id
            print(f'New solution found : {dict(shared_best_result)}')

def run_parallel(num_multiple_random_starts=5):
    with mproc.Manager() as manager:

        # create a variable shared across all threads
        shared_best_result = manager.dict()
        best_results_schema = {
            'task_id': None,
            'time': None,
            'best': np.inf,
        }
        for k,v in best_results_schema.items():
            shared_best_result[k] = v

        lock = manager.Lock()
        # functools partial will implicitly pass lock as first argument
        child_partial = functools.partial(child_proc, lock)

        with manager.Pool(processes=5) as pool:
            result = pool.starmap(child_partial, [(
                task_id,
                shared_best_result,
            ) for task_id in range(num_multiple_random_starts)])

            logger.info(shared_best_result)

        # convert Manager.dict to python's dict before exiting Manager context
        result = dict(shared_best_result)
    return result

if __name__ == '__main__':
    result = run_parallel()
    print(result)

Thanks for help!!

mandarup commented 3 years ago

it seems like the issue with passing lock to the child process

def child_proc_simple(task_id):
    #time.sleep(1)
    print(task_id)

def child_proc_simple_with_lock(lock, task_id):
    #time.sleep(1)
    print(task_id)

def simple():
    with mproc.Manager() as manager:

        # create a variable shared across all threads
        shared_best_result = manager.dict()
        best_results_schema = {
            'task_id': None,
            'time': None,
            'best': np.inf,
        }
        for k,v in best_results_schema.items():
            shared_best_result[k] = v

        lock = manager.Lock()

        with manager.Pool(processes=5) as pool:

            # this works:                                         <------- WORKS
            result = pool.starmap(child_proc_simple, [(
                 lock,
                 task_id,
            ) for task_id in range(10)])

            # this does not work, gets stuck                       <-------   FAILS
            result = pool.starmap(child_proc_simple_with_lock, [(
                lock,
                task_id,
            ) for task_id in range(10)])

            logger.info(shared_best_result)

        # convert Manager.dict to python's dict before exiting Manager context
        result = dict(shared_best_result)
    return result
mandarup commented 3 years ago

Using processes directly seems to work.

with mproc.Manager() as manager:
        # create a variable shared across all threads
        shared_best_result = manager.dict()
        best_results_schema = {
            'task_id': None,
            'time': None,
            'best': 1e6,
        }
        for k, v in best_results_schema.items():
            shared_best_result[k] = v

        lock = manager.Lock()

        processes = [
            mproc.Process(target=child_proc,
                          args=(lock, task_id, shared_best_result,))
            for task_id in range(10)
        ]
        for process in processes:
            process.start()
        for process in processes:
            process.join()

this is a workaround, closing this for now.