rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.23k stars 884 forks source link

[QST] Returning from multi-thread. TypeError: a bytes-like object is required, not 'dict' #15246

Closed blue-cat-whale closed 3 months ago

blue-cat-whale commented 6 months ago

When running my code with cudf, I got TypeError: a bytes-like object is required, not 'dict' in the multi-thread returning part.

  1. Running the code without -m cudf.pandas option is fine.
  2. It's okay if each multi-thread branch returns merely a scalar.
  3. Program CRUSHES if a multi-thread branch returns a dataframe.

This is the code message:

concurrent.futures.process._RemoteTraceback:
'''
Traceback (most recent call last):
  File "/usr/lib64/python3.9/concurrent/futures/process.py", line 387, in wait_result_broken_or_wakeup
    result_item = result_reader.recv()
  File "/usr/lib64/python3.9/multiprocessing/connection.py", line 255, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/fast_slow_proxy.py", line 742, in __setstate__
    unpickled_wrapped_obj = pickle.loads(state)
TypeError: a bytes-like object is required, not 'dict'
'''

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib64/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/__main__.py", line 91, in <module>
    main()
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/__main__.py", line 87, in main
    runpy.run_path(args.args[0], run_name="__main__")
  File "/usr/lib64/python3.9/runpy.py", line 288, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/usr/lib64/python3.9/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "clean_header.py", line 48, in <module>
    main()
  File "clean_header.py", line 45, in main
    my_func()
  File "clean_header.py", line 39, in my_func
    for obj in r:
  File "/usr/lib64/python3.9/concurrent/futures/process.py", line 562, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib64/python3.9/concurrent/futures/_base.py", line 609, in result_iterator
    yield fs.pop().result()
  File "/usr/lib64/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/lib64/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Here is my code.

from datetime import datetime, timedelta, date
import numpy as np
import pandas as pd
from random import randint
import swifter
import json, sys, os
from cudf.pandas.module_accelerator import disable_module_accelerator

from functools import partial
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method

def data_generation(nRows: int):
################## unimportant, for reproducing purpose ###################
# This function generates the dataframe obj, which has 5 columns, and the data are sorted by WorkingDay and Minute ascendingly
    my_df = pd.DataFrame(data={'WorkingDay': ['2019-01-02', '2018-01-02', '2019-05-02', '2020-01-02', '2021-01-02'], 'name': ['albert', 'alex', 'alice', 'ben', 'bob'], 'Minute': ['09:00:00', '09:20:00', '08:00:00', '07:00:00', '09:30:00'], 'aaa': np.random.rand(5), 'bbb': np.    random.rand(5)})
    my_df = pd.concat([my_df for i in range(int(nRows/5))], axis=0)
    my_df['WorkingDay'] = my_df['WorkingDay'].map(lambda x: (date(randint(2010,2020), randint(1,4), randint(1,5))).strftime('%Y-%m-%d'))
    my_df['Minute'] = np.random.permutation(my_df['Minute'].values)
    my_df = my_df.sort_values(by=['WorkingDay', 'Minute'], inplace=False).reset_index(drop=True,inplace=False)
    return my_df

def my_func_single(branchIndex: int):
    my_df = data_generation(20-5*branchIndex)
# data generated
#############################################################################
    # The multi-thread return is problematic
#############################################################################
    #return my_df.shape[0]
    return my_df

def my_func():
    set_start_method('spawn')
    my_func_partial = partial(my_func_single)
    with Pool(max_workers=2) as pool:
        r = pool.map(my_func_partial, range(4))
    for obj in r:
        #print('df has length: {}.'.format(obj))
        print('df has length: {}.'.format(obj.shape[0]))

def main():
    print('-------------------- program starts -----------------------')
    my_func()

if __name__ == '__main__':
    main()

Relevant dependencies:

cuda-python==12.4.0
cudf-cu12==24.4.0a516
cugraph-cu12==24.4.0a69
cuml-cu12==24.4.0a37
dask==2024.1.1
dask-cuda==24.4.0a11
dask-cudf-cu12==24.4.0a516
pylibcugraph-cu12==24.4.0a69
pylibraft-cu12==24.4.0a70
blue-cat-whale commented 6 months ago

I tried another parallel mechanism and a similar error appers.

The new code:

def my_func():
  num_cores = 2
  inputs = range(4)
  results = Parallel(n_jobs=num_cores)(delayed(my_func_single)(i) for i in inputs)
  for obj in results:   
    print('df has length: {}.'.format(obj.shape[0]))

def main():
  print('-------------------- program starts -----------------------')
  my_func()  

if __name__ == '__main__':
  main()

The error message:

joblib.externals.loky.process_executor._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/local/lib64/python3.9/site-packages/joblib/externals/loky/process_executor.py", line 661, in wait_result_broken_or_wakeup
    result_item = result_reader.recv()
  File "/usr/lib64/python3.9/multiprocessing/connection.py", line 255, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/fast_slow_proxy.py", line 742, in __setstate__
    unpickled_wrapped_obj = pickle.loads(state)
TypeError: a bytes-like object is required, not 'dict'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib64/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/__main__.py", line 91, in <module>
    main()
  File "/usr/local/lib64/python3.9/site-packages/cudf/pandas/__main__.py", line 87, in main
    runpy.run_path(args.args[0], run_name="__main__")
  File "/usr/lib64/python3.9/runpy.py", line 288, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "/usr/lib64/python3.9/runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "/usr/lib64/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "clean_header.py", line 49, in <module>
    main()
  File "clean_header.py", line 45, in main
    my_func()
  File "clean_header.py", line 38, in my_func
    results = Parallel(n_jobs=num_cores)(delayed(my_func_single)(i) for i in inputs)
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 1952, in __call__
    return output if self.return_generator else list(output)
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 1595, in _get_outputs
    yield from self._retrieve()
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 1699, in _retrieve
    self._raise_error_fast()
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 1734, in _raise_error_fast
    error_job.get_result(self.timeout)
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 736, in get_result
    return self._return_or_raise()
  File "/usr/local/lib64/python3.9/site-packages/joblib/parallel.py", line 754, in _return_or_raise
    raise self._result
joblib.externals.loky.process_executor.BrokenProcessPool: A result has failed to un-serialize. Please ensure that the objects returned by the function are always picklable.
blue-cat-whale commented 6 months ago

I use return my_df.values, list(my_df.index) to bypass this problem.

vyasr commented 6 months ago

I'm glad you were able to get the issue resolved in your case! That said, it does look like you're highlighting a real issue with using cudf.pandas objects in multiprocessing, so I'm going to reopen this issue for now. Here's a MWE for future investigation indicating that it's also sensitive to how the process is created. Since fork works while spawn does not, we're probably relying on some implicit state being preserved that is lost when a new process is spawned.

# Works correctly for `import cudf as pd`
import pandas as pd

from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method

def f(i: int):
    return pd.DataFrame({'a': [i]})

def main():
    for method in ['fork', 'spawn', 'forkserver']:
        set_start_method(method, force=True)
        with Pool(max_workers=2) as pool:
            r = pool.map(f, range(4))
        try:
            list(r)
        except Exception as e:
            print(f'{type(e).__name__}: {method}')
        else:
            print(f'Succeeded: {method}')

if __name__ == '__main__':
    main()
wence- commented 6 months ago

This problem exhibits because when using spawn or forkserver, the new python process that is started by multiprocessing does not have the custom cudf.pandas metapath finder installed. Hence, the import of pandas as import pandas as pd fetches the real (unwrapped) pandas module, rather than the wrapped (cudf.pandas) module.

Consider:

import sys
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method

def f():
    print(sys.meta_path)

def main():
    for method in ['fork', 'spawn', 'forkserver']:
        print(method)
        set_start_method(method, force=True)
        with Pool(max_workers=1) as pool:
            result = pool.submit(f).result()

if __name__ == "__main__":
    main()

When run with python -m cudf.pandas bug.py:

fork
[ModuleAccelerator(fast=cudf, slow=pandas), <_distutils_hack.DistutilsMetaFinder object at 0x76f18b8991e0>, <_rmm_editable.ScikitBuildRedirectingFinder object at 0x76f18ba67fa0>, <_cudf_kafka_editable.ScikitBuildRedirectingFinder object at 0x76f18ba64700>, <_cudf_editable.ScikitBuildRedirectingFinder object at 0x76f18bb2b3d0>, <class '_frozen_importlib.BuiltinImporter'>, <class '_frozen_importlib.FrozenImporter'>, <class '_frozen_importlib_external.PathFinder'>, <six._SixMetaPathImporter object at 0x76f04651b4c0>]
 ^^^^^^^^^^^^^^^^^^^ Good!
spawn
[<_distutils_hack.DistutilsMetaFinder object at 0x78af5ec412d0>, <_rmm_editable.ScikitBuildRedirectingFinder object at 0x78af5ec405b0>, <_cudf_kafka_editable.ScikitBuildRedirectingFinder object at 0x78af5ee0c7f0>, <_cudf_editable.ScikitBuildRedirectingFinder object at 0x78af5eed74c0>, <class '_frozen_importlib.BuiltinImporter'>, <class '_frozen_importlib.FrozenImporter'>, <class '_frozen_importlib_external.PathFinder'>]
 ^ BAD!
forkserver
[<_distutils_hack.DistutilsMetaFinder object at 0x7c5cd58e92a0>, <_rmm_editable.ScikitBuildRedirectingFinder object at 0x7c5cd58e8580>, <_cudf_kafka_editable.ScikitBuildRedirectingFinder object at 0x7c5cd58a47c0>, <_cudf_editable.ScikitBuildRedirectingFinder object at 0x7c5cd596f490>, <class '_frozen_importlib.BuiltinImporter'>, <class '_frozen_importlib.FrozenImporter'>, <class '_frozen_importlib_external.PathFinder'>]

The way one can work around this is to use the functional interface to cudf.pandas and install manually at the start of the file. Note that this must be done before an import of pandas. So:

import cudf.pandas
cudf.pandas.install()

import pandas as pd
from concurrent.futures import ProcessPoolExecutor as Pool
from multiprocessing import set_start_method

def f(i: int):
    return pd.DataFrame({'a': [i]})

def main():
    for method in ['fork', 'spawn', 'forkserver']:
        set_start_method(method, force=True)
        with Pool(max_workers=2) as pool:
            r = pool.map(f, range(4))
        try:
            list(r)
        except Exception as e:
            print(f'{type(e).__name__}: {method}')
        else:
            print(f'Succeeded: {method}')

if __name__ == '__main__':
    main()

Will work for all three cases.

wence- commented 6 months ago

We should probably add this as a known limitation in the FAQ.