uqfoundation / pathos

parallel graph management and execution in heterogeneous computing
http://pathos.rtfd.io
Other
1.38k stars 89 forks source link

Is this fixed: "can't send large data back to main thread"? #217

Closed y-he2 closed 1 year ago

y-he2 commented 3 years ago

The written problem was suppose to be fixed in: https://stackoverflow.com/questions/47692566/python-multiprocessing-apply-async-assert-left-0-assertionerror https://github.com/python/cpython/pull/9027

However Im still encountering this error when using Multiprocess. I wonder whether that fix was merged in Multiprocess?

mmckerns commented 3 years ago

For which version of multiprocess and python are you experiencing this error? Also, if you can give a simple example here that reproduces the error, that would also be good.

y-he2 commented 3 years ago

Thanks for a quick reply. Versions: multiprocess 0.70.12.2 py38h294d835_0 Python 3.8.5

Heres my helper module that suppose to apply a function defined in "func_module_name.py" module to a tensor passed into parallel_tensor_apply(), in a shared memory manner.

import numpy as np
import multiprocess as mp
from multiprocess import shared_memory
import os

def init_worker( func_module_name, shared_memory_block_master, data_shape_ref, data_type_ref ): 
    print( "Init worker processing:\n\t", mp.current_process() )

    global func_module
    try:
        func_module = __import__( func_module_name )
    except ModuleNotFoundError:
        print( "The function module file must be in the same folder as the calling script!" )
    print( dir( func_module ) )
    assert hasattr( func_module, 'proc' ), "The function module must include a function in form of proc( data_tensor, idx )!"

    global data_shape
    data_shape = data_shape_ref
    global data_type
    data_type = data_type_ref

    ## suppose to define a global ref to the shared block on each worker
    global shared_memory_block_ref
    shared_memory_block_ref = mp.shared_memory.SharedMemory( name = shared_memory_block_master.name )

def worker_proc( idx ):
    print( "Processing on worker:\n\t", mp.current_process() )

    # suppose on each worker try to access the global ref to create a buffered ndarray
    global data_shape
    global data_type
    shared_data_ref = np.ndarray( shape = data_shape, dtype = data_type, buffer = shared_memory_block_ref.buf )

    # print( shared_data_ref, flush = True )
    return( func_module.proc( shared_data_ref, idx ) )

def parallel_tensor_apply( func_module_name, data_tensor, index_set, max_processes = 99 ):
    if mp.current_process().name == 'MainProcess':
        shared_memory_block_master = shared_memory.SharedMemory( 
            create = True, 
            size = data_tensor.nbytes 
        )
        shared_data_master = np.ndarray( 
            shape = data_tensor.shape, 
            dtype = data_tensor.dtype, 
            buffer = shared_memory_block_master.buf 
        )
        ## Copy the data tensor to the shared memory block once, performed only on the master process. 
        shared_data_master[:] = data_tensor[:]

        with mp.Pool( 
            processes = min( max_processes, os.cpu_count() ), 
            initializer = init_worker, 
            initargs = (func_module_name, shared_memory_block_master, data_tensor.shape, data_tensor.dtype)
        ) as pool: 
            res = pool.map( 
                worker_proc, 
                index_set
            )
        return( res )

However when using Multiprocess with Jupyter Notebook Im getting the following error after (i think) the applied function successfully executed and try to return the res. Im not sure whether its my shared memory code that was wrong or the Multiprocess library bug yet. But im suspecting the library still contains the bug mentioned in the previous mentioned links.

Exception in thread Thread-8:
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\site-packages\multiprocess\pool.py", line 576, in _handle_results
    task = get()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\site-packages\multiprocess\connection.py", line 253, in recv
    buf = self._recv_bytes()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\site-packages\multiprocess\connection.py", line 321, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\site-packages\multiprocess\connection.py", line 340, in _get_more_data
    assert left > 0
AssertionError

Which is why im asking whether the fork u guys done (which is a good job btw) included those fixes.

y-he2 commented 3 years ago

and i guess we can just assume that func_module.proc() simply returns a tensor with size larger than 2GB for any input.

mmckerns commented 3 years ago

How does this code run? It seems that some of your example is missing. Can you also simplify it to the minimum that exhibits the behavior you are experiencing? Also, does the behavior only happen when run from a Jupyter notebook?

y-he2 commented 3 years ago

Heres an absolute minimal example that should be equivalent:

import numpy as np
import multiprocessing as mp
from multiprocessing import shared_memory
import os
import sys
def worker_proc( something ):
    import pickle
    ## "training.data" can be any object pickled into a file larger than 2GB (just pickle a large numpy tensor).
    with open( 'training.data', 'rb' ) as handle:
        temp_load = pickle.load( handle )
    return( temp_load )

if( __name__ == '__main__' ): 
    with mp.Pool( processes = 2 ) as pool: 
        res = pool.map( worker_proc, range( 2 ) )
    print( sys.getsizeof( res ) )

Error msg when running from cmd with Multiprocessing (yes even that):

Exception in thread Thread-3:
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\multiprocessing\pool.py", line 576, in _handle_results
    task = get()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\multiprocessing\connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\multiprocessing\connection.py", line 318, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\ProgramData\Anaconda3\envs\tf-gpu\lib\multiprocessing\connection.py", line 337, in _get_more_data
    assert left > 0
AssertionError

To answer ur question, yes the error appeared everywhere in all: Using Multiprocess in Jupyter Using Multiprocess in cmd Using Multiprocessing in cmd

So I could've been totally wrong, that the error could have lay in the Multiprocessing library itself instead of Multiprocess, and the fix in the links I posted may not been perfect, so the error somehow remained for this scenario.

Although for my case I found a way to walkaround to not use 2GB large objects in children, however if u find this bug interesting feel free to investigate further, otherwise lets close this case as the bug could've been in Multiprocessing.

mmckerns commented 1 year ago

closing this as a duplicate of #150