uqfoundation / pathos

parallel graph management and execution in heterogeneous computing
http://pathos.rtfd.io
Other
1.38k stars 89 forks source link

How to use initialize with pathos ProcessingPool #151

Closed rickstaa closed 5 years ago

rickstaa commented 6 years ago

Dear Developer,

I would like to use the initialize option to initialize the pool workers when working with the patho.multiprocessing.ProcessingPool method. However it looks like the initialize method never runs.

Thanks in advance, Greetings, Rick,

import numpy as np
import time
import multiprocessing
import logging
from multiprocessing import RawArray, Array, Lock #, Pool
from pathos.multiprocessing import ProcessingPool as Pool

# A global dictionary storing the variables passed from the initializer.
var_dict = {}

## Create worker initiation function
def init_worker(X, X_shape):

    print "Initialize"
    ## Using a dictionary is not strictly necessary. You can also
    ## use global variables.
    var_dict['X'] = X
    var_dict['X_shape'] = X_shape

## Create worker function
def worker_func():

    print "Worker"

# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':

    # Enable loggin
    mpl = multiprocessing.log_to_stderr()
    mpl.setLevel(logging.INFO)

    # Create array shape specifier
    X_shape = (20, 6)

    # Create multiprocessing array
    X = Array('d', X_shape[0] * X_shape[1])  # , lock=True) #, lock=lock)

    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)

    # Start the process pool and do the computation.
    # Here we pass X and X_shape to the initializer of each worker.
    # (Because X_shape is not a shared variable, it will be copied to each
    # child process.)
    pool = Pool(processes=1, initializer=init_worker, initargs=(X, X_shape))

    ### PERFORM IN LOOP that is changing the values of X_np ###
    for jj in range(1, 10):

        # Randomly generate some data
        data = np.random.randn(*X_shape)

        # Copy data to our shared array.
        np.copyto(X_np, data)

        # Send new data to worker
        pool.apipe(worker_func)  # , (X_np,))
        time.sleep(1)

    # # Should print the same results.
    # print('Results (numpy):\n', np.sum(X_np, 1))
    # X_np[0] = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

    # Terminate pool
    # time.sleep(5)
    pool.terminate()
mmckerns commented 5 years ago

@rickstaa: If you are looking to use the "classic" multiprocessing style init args, then you need to use a different pool.

Currently you are using a pointer to pathos.pools.ProcessPool, which has a different __init__ that doesn't include an initializer. You want pathos.pools._ProcessPool, which does have an initializer in __init__.

Note that the _ProcessPool has an identical interface to the STL multiprocessing.Pool, so that means there is no terminate method, etc.

Try the latter pool above, and close the ticket if it works for you.

mmckerns commented 5 years ago

@rickstaa: I'm going to assume from the lack of activity here that we have addressed your question. Please reopen this ticket if we have not.

CatalinVoss commented 5 years ago

Is there a plan to add a per-worker initialization method to the newer APIs? This seems like super useful/crucial functionality

mmckerns commented 5 years ago

@CatalinVoss: I don't have an open ticket to add per-worker initialization to the high-level pathos pools. Note that the functionality is already available on the pools that start with "_" (pathos.pools._ProcessingPool).

ddelange commented 1 year ago

propagating in https://github.com/uqfoundation/pathos/pull/252