nalepae / pandarallel

A simple and efficient tool to parallelize Pandas operations on all available CPUs
https://nalepae.github.io/pandarallel
BSD 3-Clause "New" or "Revised" License
3.69k stars 212 forks source link

AttributeError: Can't pickle local object 'prepare_worker.<locals>.closure.<locals>.wrapper' #72

Open ChristianRue opened 4 years ago

ChristianRue commented 4 years ago

I would like to run parallel on a Jupyter Notebook in AWS Sagemaker. However even in the most basic examples I get the following error message:

~/anaconda3/envs/python3/lib/python3.6/site-packages/pandarallel/pandarallel.py in closure(data, func, *args, **kwargs)
    434         try:
    435             pool = Pool(
--> 436                 nb_workers, worker_init, (prepare_worker(use_memory_fs)(worker),)
    437             )
    438 

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild)
    117         from .pool import Pool
    118         return Pool(processes, initializer, initargs, maxtasksperchild,
--> 119                     context=self.get_context())
    120 
    121     def RawValue(self, typecode_or_type, *args):

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context)
    172         self._processes = processes
    173         self._pool = []
--> 174         self._repopulate_pool()
    175 
    176         self._worker_handler = threading.Thread(

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/pool.py in _repopulate_pool(self)
    237             w.name = w.name.replace('Process', 'PoolWorker')
    238             w.daemon = True
--> 239             w.start()
    240             util.debug('added worker')
    241 

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         # Avoid a refcycle if the target function holds an indirect

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/context.py in _Popen(process_obj)
    289         def _Popen(process_obj):
    290             from .popen_forkserver import Popen
--> 291             return Popen(process_obj)
    292 
    293     class ForkContext(BaseContext):

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/popen_forkserver.py in __init__(self, process_obj)
     33     def __init__(self, process_obj):
     34         self._fds = []
---> 35         super().__init__(process_obj)
     36 
     37     def duplicate_for_child(self, fd):

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/popen_fork.py in __init__(self, process_obj)
     17         util._flush_std_streams()
     18         self.returncode = None
---> 19         self._launch(process_obj)
     20 
     21     def duplicate_for_child(self, fd):

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/popen_forkserver.py in _launch(self, process_obj)
     45         try:
     46             reduction.dump(prep_data, buf)
---> 47             reduction.dump(process_obj, buf)
     48         finally:
     49             set_spawning_popen(None)

~/anaconda3/envs/python3/lib/python3.6/multiprocessing/reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

AttributeError: Can't pickle local object 'prepare_worker.<locals>.closure.<locals>.wrapper'

This was thrown when running


df = pandas.DataFrame(numpy.random.rand(240).reshape(80,3),columns=list('abc'))
df['id'] = numpy.arange(80) % 10
df.groupby('id')[['a']].parallel_apply(lambda x:pandas.DataFrame(numpy.array([x.values.flatten()]*2),columns=list('abcdefgh')))
jmaralcZ commented 4 years ago

Hi guys, I have the same issue running all the examples here that you provide: https://github.com/nalepae/pandarallel/blob/master/docs/examples.ipynb

Running them with python 3.8 and Pipfile: [[source]] name = "pypi" url = "https://pypi.org/simple" verify_ssl = true

[dev-packages] flake8 = "*"

[packages] fastapi = "" uvicorn = "" pyyaml = "" pandas = "" psycopg2 = ">=2.8.4" colorlog = "" shapely = "" tqdm = "" googlemaps = "" timezonefinder = "" python-levenshtein = "" boto3 = "" polyline = "" geopandas = "" scipy = "" sklearn = "" colour = "" folium = "" matplotlib = "" seaborn = "" googleads = "" holoviews = "" console = "" nox = "" pytest = "" flake8 = "" coverage = "" pytest-cov = "" celery = "" redis = "" python-multipart = "" xlrd = "" jupyterlab = "" nbconvert = "" ipywidgets = "" rtree = "" pandarallel = ""

[requires] python_version = "3.8"

mvcduarte commented 4 years ago

I have the same problem here! Using pandarallel on Windows 10 and Jupyter notebook.

C1ARKGABLE commented 4 years ago

Getting the same issue on MacOS in Python 3.8

Package Version black 19.10b0
numpy 1.17.4
pandarallel 1.4.6
pandas 0.25.3
pip 20.0.2

Edit: Tried the same code in Python 3.7.3 which works no problem.

achu44 commented 4 years ago

Getting the same issue: Windows 10 and Spyder

Ram1004 commented 4 years ago

Having the same issue on Windows 10 with Jupyter Notebook

jqqqqqqqqqq commented 4 years ago

I managed to fix this inspired by this

Take a look at this issue

But something is working very weird:

Since I didn't solve it completely and I don't have any time and effort to work on it, I will not submit any pull request for now.

Here is my patch:

diff --git a/pandarallel/pandarallel.py b/pandarallel/pandarallel.py
index b7783ea..c3e918d 100644
--- a/pandarallel/pandarallel.py
+++ b/pandarallel/pandarallel.py
@@ -64,96 +64,112 @@ def is_memory_fs_available():
     return os.path.exists(MEMORY_FS_ROOT)

-def prepare_worker(use_memory_fs):
-    def closure(function):
-        def wrapper(worker_args):
-            """This function runs on WORKERS.
-
-            If Memory File System is used:
-            1. Load all pickled files (previously dumped by the MASTER) in the
-               Memory File System
-            2. Undill the function to apply (for lambda functions)
-            3. Tell to the MASTER the input file has been read (so the MASTER can remove it
-               from the memory
-            4. Apply the function
-            5. Pickle the result in the Memory File System (so the Master can read it)
-            6. Tell the master task is finished
-
-            If Memory File System is not used, steps are the same except 1. and 5. which are
-            skipped.
-            """
-            if use_memory_fs:
-                (
-                    input_file_path,
-                    output_file_path,
-                    index,
-                    meta_args,
-                    queue,
-                    progress_bar,
-                    dilled_func,
-                    args,
-                    kwargs,
-                ) = worker_args
-
-                try:
-                    with open(input_file_path, "rb") as file:
-                        data = pickle.load(file)
-                        queue.put((INPUT_FILE_READ, index))
-
-                    result = function(
-                        data,
-                        index,
-                        meta_args,
-                        queue,
-                        progress_bar,
-                        dill.loads(dilled_func),
-                        *args,
-                        **kwargs
-                    )
+class prepare_worker_with_memory_fs:
+    def __init__(self, func):
+        self.func = func
+
+    def __call__(self, worker_args):
+        """This function runs on WORKERS.
+
+        If Memory File System is used:
+        1. Load all pickled files (previously dumped by the MASTER) in the
+           Memory File System
+        2. Undill the function to apply (for lambda functions)
+        3. Tell to the MASTER the input file has been read (so the MASTER can remove it
+           from the memory
+        4. Apply the function
+        5. Pickle the result in the Memory File System (so the Master can read it)
+        6. Tell the master task is finished
+
+        If Memory File System is not used, steps are the same except 1. and 5. which are
+        skipped.
+        """
+        (
+            input_file_path,
+            output_file_path,
+            index,
+            meta_args,
+            queue,
+            progress_bar,
+            dilled_func,
+            args,
+            kwargs,
+        ) = worker_args

-                    with open(output_file_path, "wb") as file:
-                        pickle.dump(result, file)
+        try:
+            with open(input_file_path, "rb") as file:
+                data = pickle.load(file)
+                queue.put((INPUT_FILE_READ, index))

-                    queue.put((VALUE, index))
+            result = self.func(
+                data,
+                index,
+                meta_args,
+                queue,
+                progress_bar,
+                dill.loads(dilled_func),
+                *args,
+                **kwargs
+            )

-                except Exception:
-                    queue.put((ERROR, index))
-                    raise
-            else:
-                (
-                    data,
-                    index,
-                    meta_args,
-                    queue,
-                    progress_bar,
-                    dilled_func,
-                    args,
-                    kwargs,
-                ) = worker_args
-
-                try:
-                    result = function(
-                        data,
-                        index,
-                        meta_args,
-                        queue,
-                        progress_bar,
-                        dill.loads(dilled_func),
-                        *args,
-                        **kwargs
-                    )
-                    queue.put((VALUE, index))
+            with open(output_file_path, "wb") as file:
+                pickle.dump(result, file)

-                    return result
+            queue.put((VALUE, index))

-                except Exception:
-                    queue.put((ERROR, index))
-                    raise
+        except Exception:
+            queue.put((ERROR, index))
+            raise

-        return wrapper
+class prepare_worker_without_memory_fs:
+    def __init__(self, func):
+        self.func = func

-    return closure
+    def __call__(self, worker_args):
+        """This function runs on WORKERS.
+
+        If Memory File System is used:
+        1. Load all pickled files (previously dumped by the MASTER) in the
+           Memory File System
+        2. Undill the function to apply (for lambda functions)
+        3. Tell to the MASTER the input file has been read (so the MASTER can remove it
+           from the memory
+        4. Apply the function
+        5. Pickle the result in the Memory File System (so the Master can read it)
+        6. Tell the master task is finished
+
+        If Memory File System is not used, steps are the same except 1. and 5. which are
+        skipped.
+        """
+        (
+            data,
+            index,
+            meta_args,
+            queue,
+            progress_bar,
+            dilled_func,
+            args,
+            kwargs,
+        ) = worker_args
+
+        try:
+            result = self.func(
+                data,
+                index,
+                meta_args,
+                queue,
+                progress_bar,
+                dill.loads(dilled_func),
+                *args,
+                **kwargs
+            )
+            queue.put((VALUE, index))
+
+            return result

+        except Exception:
+            queue.put((ERROR, index))
+            raise

 def create_temp_files(nb_files):
     """Create temporary files in Memory File System."""
@@ -438,9 +454,14 @@ def parallelize(
         nb_workers = len(chunk_lengths)

         try:
-            pool = Pool(
-                nb_workers, worker_init, (prepare_worker(use_memory_fs)(worker),),
-            )
+            if use_memory_fs:
+                pool = Pool(
+                    nb_workers, worker_init, (prepare_worker_with_memory_fs(worker),),
+                )
+            else:
+                pool = Pool(
+                    nb_workers, worker_init, (prepare_worker_without_memory_fs(worker),),
+                )

             map_result = pool.map_async(global_worker, workers_args)
             pool.close()
mikelehen commented 4 years ago

I'm running into this as well. In case it's of any help, I think the issue may only occur when you are using the "spawn" start method of multiprocessing (i.e. multiprocessing.set_start_method("spawn")) which is the default mode under Windows and macOS (at least in python 3.8 and newer since "fork" is unreliable on macOS--see https://bugs.python.org/issue33725).

Would be great to see a fix for this implemented.

shermansiu commented 6 months ago

I can reproduce this on Linux:

Python: 3.10.13 Pandarallel: 1.6.5 Pandas: 2.2.0 Numpy: 1.26.4

import numpy as np
import pandas as pd
import pandarallel

pandarallel.core.CONTEXT = pandarallel.core.multiprocessing.get_context('spawn')
pandarallel.pandarallel.initialize()

df = pd.DataFrame(np.random.rand(240).reshape(80,3),columns=list('abc'))
df['id'] = np.arange(80) % 10
df.groupby('id')[['a']].parallel_apply(lambda x: pd.DataFrame(np.array([x.values.flatten()]*2),columns=list('abcdefgh')))

This bug only appears for me when using the "spawn" start method.

Traceback ``` RemoteTraceback: """ Traceback (most recent call last): File "[~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py", line 125](~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py#line=124), in worker result = (True, func(*args, **kwds)) File "[~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py", line 51](~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py#line=50), in starmapstar return list(itertools.starmap(args[0], args[1])) File "[~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py", line 95](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py#line=94), in __call__ result = self.work_function( File "[~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py", line 40](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py#line=39), in work return [compute_result(key, df) for key, df in data] File "[~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py", line 40](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py#line=39), in return [compute_result(key, df) for key, df in data] File "[~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py", line 34](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/data_types/dataframe_groupby.py#line=33), in compute_result result = user_defined_function( File "/tmp/ipykernel_3714664/809581393.py", line 11, in meow NameError: name 'pd' is not defined """ The above exception was the direct cause of the following exception: NameError Traceback (most recent call last) Cell In[8], line 12 10 def meow(x): 11 return pd.DataFrame(np.array([x.values.flatten()]*2),columns=list('abcdefgh')) ---> 12 df.groupby('id')[['a']].parallel_apply(meow) File [~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py:333](~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py#line=332), in parallelize_with_memory_file_system..closure(data, user_defined_function, *user_defined_function_args, **user_defined_function_kwargs) 325 return wrapped_reduce_function( 326 (Path(output_file.name) for output_file in output_files), 327 reduce_extra, 328 ) 329 except EOFError: 330 # Loading the files failed, this most likely means that there 331 # was some error during processing and the files were never 332 # saved at all. --> 333 results_promise.get() 335 # If the above statement does not raise an exception, that 336 # means the multiprocessing went well and we want to re-raise 337 # the original EOFError. 338 raise File [~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py:774](~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py#line=773), in ApplyResult.get(self, timeout) 772 return self._value 773 else: --> 774 raise self._value NameError: name 'pd' is not defined """ The above exception was the direct cause of the following exception: NameError Traceback (most recent call last) Cell In[8], line 12 10 def meow(x): 11 return pd.DataFrame(np.array([x.values.flatten()]*2),columns=list('abcdefgh')) ---> 12 df.groupby('id')[['a']].parallel_apply(meow) File [~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py:333](http://dlu:8000/user/sherman/lab/tree/mnt/ubuntu_data_hdd/open_source/code/board_game_nn_huggingface/data/~/miniconda3/envs/fn_env/lib/python3.10/site-packages/pandarallel/core.py#line=332), in parallelize_with_memory_file_system..closure(data, user_defined_function, *user_defined_function_args, **user_defined_function_kwargs) 325 return wrapped_reduce_function( 326 (Path(output_file.name) for output_file in output_files), 327 reduce_extra, 328 ) 329 except EOFError: 330 # Loading the files failed, this most likely means that there 331 # was some error during processing and the files were never 332 # saved at all. --> 333 results_promise.get() 335 # If the above statement does not raise an exception, that 336 # means the multiprocessing went well and we want to re-raise 337 # the original EOFError. 338 raise File [~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py:774](http://dlu:8000/user/sherman/lab/tree/mnt/ubuntu_data_hdd/open_source/code/board_game_nn_huggingface/data/~/miniconda3/envs/fn_env/lib/python3.10/multiprocessing/pool.py#line=773), in ApplyResult.get(self, timeout) 772 return self._value 773 else: --> 774 raise self._value NameError: name 'pd' is not defined ```