vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.28k stars 590 forks source link

[BUG-REPORT] Using Vaex to load and merge data within a pipeline running in parallel crashes #2090

Open vicruiser opened 2 years ago

vicruiser commented 2 years ago

Hi,

I'm currently facing difficulties to use vaex in a script that processes data in parallel with the library joblib.

Per each thread, the code have to load two data frames with vaex corresponding to a protein ID and join them by one column that is called "Protein_position".

To load the data, I used the following code:

def parser(prot_id, db_dir):
    '''
    Parse input and detect whether is a VCF or VEP file. Any other format
    is invalid. 

    Parameters
    ----------
    prot_id : str        
        protein id 
    db_dir : str
        directory where to find the database to parse

    Returns
    -------
    df 
        parsed file
    '''

    f = glob.glob(os.path.join(db_dir, (prot_id + '.*')))

    if not f:
        raise IOError()
    else:
        try:
            if '.gz' in f[0]:
                try:  
                    df = vaex.from_csv(f[0], sep="\t", compression='gzip', dtype = str)
                    return df
                except: 
                    df = vaex.from_csv(f[0], sep="\t", compression='gzip', low_memory=False, dtype = str)
                    return df
            else: 
                try: 
                    df = vaex.from_csv(f[0], sep="\t", dtype = str)
                    return df
                except: 
                    df = vaex.from_csv(f[0], sep="\t", low_memory=False, dtype = str)
                    return df
        except:
            if '.gz' in f[0]:
                try:
                    df = vaex.from_csv(f[0], sep=" ", compression='gzip', dtype = str)
                    return df
                except: 
                    df = vaex.from_csv(f[0], sep=" ", compression='gzip', low_memory=False, dtype = str)
                    return df
            else: 
                try: 
                    df = vaex.from_csv(f[0], sep=" ", dtype = str)
                    return df
                except: 
                    df = vaex.from_csv(f[0], sep=" ", low_memory=False, dtype = str)
                    return df

To join the data frames, I use:

df1['Protein_position'] = df1['Protein_position'].astype(str)
df2['Protein_position'] = df2['Protein_position'].astype(str)

df3 = df1.join(
                df2, on='Protein_position',allow_duplication=True, how = "inner")

I run the code with 7 threads in total. I run mprof to monitor the code and there's no overload of the CPU's or the memory.

The code works well and at a certain point stops and prints out the error below.

I'm sure it's not a problem of the protein ID as the code stops working at different protein ID each time I run it.

Additionally, I take care of one of the errors appearing below which is: Failed to parse string: '144-145' as a scalar of type double as the code previously processes this type of instance for other protein IDs without any problem.

I've been checking documentation for more than 1 week already and I believe there might exist some interference between using a mutithreading process and vaex at the same time.

Thanks in advance for your help.

Software information

Additional information The error code is the following:

File "/usr/lib/python3.8/threading.py", line 890, in _bootstrap self._bootstrap_inner() File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/lib/python3.8/threading.py", line 870, in run self._target(*self._args, self._kwargs) File "/usr/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker work_item.run() File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, *self.kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 90, in iterator = (loop.run_in_executor(self, lambda value=value: wrapped(value)) for value in cancellable_iter()) File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 76, in wrapped return callable(self.local.index, args, kwargs, kwargs_extra) File "/home/project_2022/lib/python3.8/site-packages/vaex/execution.py", line 338, in process_part filter_mask = df.evaluate_selection_mask(None, i1=i1, i2=i2, cache=True) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2855, in evaluate_selection_mask mask_global = scope_global.evaluate(FILTER_SELECTION_NAME) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 226, in getitem mask_values = selection.evaluate(self.df, variable, self.i1, self.i2, self.filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/selections.py", line 131, in evaluate result = df._evaluate_selection_mask(self.boolean_expression, i1, i2, filter_mask=filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2841, in _evaluate_selection_mask mask = scope.evaluate(name) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 265, in getitem values = self.evaluate(expression) # , out=self.buffers[variable]) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 191, in evaluate tb.print_stack() Traceback (most recent call last): File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 265, in getitem values = self.evaluate(expression) # , out=self.buffers[variable]) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/arrow/numpy_dispatch.py", line 136, in wrapper result = f(*args, *kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/functions.py", line 2506, in _astype y = x.cast(dtype, safe=False) File "pyarrow/array.pxi", line 816, in pyarrow.lib.Array.cast File "/home/project_2022/lib/python3.8/site-packages/pyarrow/compute.py", line 297, in cast return call_function("cast", [arr], options) File "pyarrow/_compute.pyx", line 527, in pyarrow._compute.call_function File "pyarrow/_compute.pyx", line 337, in pyarrow._compute.Function.call File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Failed to parse string: '144-145' as a scalar of type double ERROR:ThreadPoolExecutor-1_3:vaex.scopes:error in evaluating: 'Protein_position' Traceback (most recent call last): File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 265, in getitem values = self.evaluate(expression) # , out=self.buffers[variable]) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/arrow/numpy_dispatch.py", line 136, in wrapper result = f(args, kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/functions.py", line 2506, in _astype y = x.cast(dtype, safe=False) File "pyarrow/array.pxi", line 816, in pyarrow.lib.Array.cast File "/home/project_2022/lib/python3.8/site-packages/pyarrow/compute.py", line 297, in cast return call_function("cast", [arr], options) File "pyarrow/_compute.pyx", line 527, in pyarrow._compute.call_function File "pyarrow/_compute.pyx", line 337, in pyarrow._compute.Function.call File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Failed to parse string: '144-145' as a scalar of type double File "/usr/lib/python3.8/threading.py", line 890, in _bootstrap self._bootstrap_inner() File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/lib/python3.8/threading.py", line 870, in run self._target(*self._args, self._kwargs) File "/usr/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker work_item.run() File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, *self.kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 90, in iterator = (loop.run_in_executor(self, lambda value=value: wrapped(value)) for value in cancellable_iter()) File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 76, in wrapped return callable(self.local.index, args, kwargs, kwargs_extra) File "/home/project_2022/lib/python3.8/site-packages/vaex/execution.py", line 338, in process_part filter_mask = df.evaluate_selection_mask(None, i1=i1, i2=i2, cache=True) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2855, in evaluate_selection_mask mask_global = scope_global.evaluate(FILTER_SELECTION_NAME) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 226, in getitem mask_values = selection.evaluate(self.df, variable, self.i1, self.i2, self.filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/selections.py", line 131, in evaluate result = df._evaluate_selection_mask(self.boolean_expression, i1, i2, filter_mask=filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2841, in _evaluate_selection_mask mask = scope.evaluate(name) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 191, in evaluate tb.print_stack() Traceback (most recent call last): File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 226, in getitem mask_values = selection.evaluate(self.df, variable, self.i1, self.i2, self.filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/selections.py", line 131, in evaluate result = df._evaluate_selection_mask(self.boolean_expression, i1, i2, filter_mask=filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2841, in _evaluate_selection_mask mask = scope.evaluate(name) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 265, in getitem values = self.evaluate(expression) # , out=self.buffers[variable]) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/arrow/numpy_dispatch.py", line 136, in wrapper result = f(*args, *kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/functions.py", line 2506, in _astype y = x.cast(dtype, safe=False) File "pyarrow/array.pxi", line 816, in pyarrow.lib.Array.cast File "/home/project_2022/lib/python3.8/site-packages/pyarrow/compute.py", line 297, in cast return call_function("cast", [arr], options) File "pyarrow/_compute.pyx", line 527, in pyarrow._compute.call_function File "pyarrow/_compute.pyx", line 337, in pyarrow._compute.Function.call File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Failed to parse string: '144-145' as a scalar of type double ERROR:ThreadPoolExecutor-1_3:vaex.scopes:error in evaluating: 'filter' Traceback (most recent call last): File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 226, in getitem mask_values = selection.evaluate(self.df, variable, self.i1, self.i2, self.filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/selections.py", line 131, in evaluate result = df._evaluate_selection_mask(self.boolean_expression, i1, i2, filter_mask=filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2841, in _evaluate_selection_mask mask = scope.evaluate(name) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 265, in getitem values = self.evaluate(expression) # , out=self.buffers[variable]) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/arrow/numpy_dispatch.py", line 136, in wrapper result = f(args, kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/functions.py", line 2506, in _astype y = x.cast(dtype, safe=False) File "pyarrow/array.pxi", line 816, in pyarrow.lib.Array.cast File "/home/project_2022/lib/python3.8/site-packages/pyarrow/compute.py", line 297, in cast return call_function("cast", [arr], options) File "pyarrow/_compute.pyx", line 527, in pyarrow._compute.call_function File "pyarrow/_compute.pyx", line 337, in pyarrow._compute.Function.call File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Failed to parse string: '144-145' as a scalar of type double File "/usr/lib/python3.8/threading.py", line 890, in _bootstrap self._bootstrap_inner() File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/lib/python3.8/threading.py", line 870, in run self._target(*self._args, self._kwargs) File "/usr/lib/python3.8/concurrent/futures/thread.py", line 80, in _worker work_item.run() File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, *self.kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 90, in iterator = (loop.run_in_executor(self, lambda value=value: wrapped(value)) for value in cancellable_iter()) File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 76, in wrapped return callable(self.local.index, args, kwargs, kwargs_extra) File "/home/project_2022/lib/python3.8/site-packages/vaex/execution.py", line 338, in process_part filter_mask = df.evaluate_selection_mask(None, i1=i1, i2=i2, cache=True) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2855, in evaluate_selection_mask mask_global = scope_global.evaluate(FILTER_SELECTION_NAME) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 191, in evaluate tb.print_stack() Traceback (most recent call last): File "/home/project_2022/bin/mapper", line 11, in load_entry_point('3dmapper', 'console_scripts', 'mapper')() File "/home/victoria/PhD/2021_2022/3Dmapper/mapper/execute_mapper.py", line 313, in main Parallel(n_jobs=num_cores, prefer = "threads")(delayed(wrapper)(prot_id.replace('\n', ''), File "/home/project_2022/lib/python3.8/site-packages/joblib/parallel.py", line 1054, in call self.retrieve() File "/home/project_2022/lib/python3.8/site-packages/joblib/parallel.py", line 933, in retrieve self._output.extend(job.get(timeout=self.timeout)) File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get raise self._value File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker result = (True, func(*args, *kwds)) File "/home/project_2022/lib/python3.8/site-packages/joblib/_parallel_backends.py", line 595, in call return self.func(args, kwargs) File "/home/project_2022/lib/python3.8/site-packages/joblib/parallel.py", line 262, in call return [func(*args, kwargs) File "/home/project_2022/lib/python3.8/site-packages/joblib/parallel.py", line 262, in return [func(*args, *kwargs) File "/home/victoria/PhD/2021_2022/3Dmapper/mapper/mapper_wrapper.py", line 68, in wrapper mapper(prot_id[i], File "/home/victoria/PhD/2021_2022/3Dmapper/mapper/mapper.py", line 411, in mapper if len(unmapped_positions) > 0: File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 4075, in len self. _cached_filtered_length = int(self.count()) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 853, in count return self._compute_agg('count', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 827, in _compute_agg return self._delay(delay, var) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 1566, in _delay self.execute() File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 398, in execute just_run(self.execute_async()) File "/home/project_2022/lib/python3.8/site-packages/vaex/asyncio.py", line 35, in just_run return loop.run_until_complete(coro) File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 402, in execute_async await self.executor.execute_async() File "/home/project_2022/lib/python3.8/site-packages/vaex/execution.py", line 253, in execute_async async for element in self.thread_pool.map_async(self.process_part, dataset.chunk_iterator(columns, chunk_size), File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 95, in map_async value = await value File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run result = self.fn(self.args, self.kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 90, in iterator = (loop.run_in_executor(self, lambda value=value: wrapped(value)) for value in cancellable_iter()) File "/home/project_2022/lib/python3.8/site-packages/vaex/multithreading.py", line 76, in wrapped return callable(self.local.index, args, kwargs, kwargs_extra) File "/home/project_2022/lib/python3.8/site-packages/vaex/execution.py", line 338, in process_part filter_mask = df.evaluate_selection_mask(None, i1=i1, i2=i2, cache=True) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2855, in evaluate_selection_mask mask_global = scope_global.evaluate(FILTER_SELECTION_NAME) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 226, in getitem mask_values = selection.evaluate(self.df, variable, self.i1, self.i2, self.filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/selections.py", line 131, in evaluate result = df._evaluate_selection_mask(self.boolean_expression, i1, i2, filter_mask=filter_mask) File "/home/project_2022/lib/python3.8/site-packages/vaex/dataframe.py", line 2841, in _evaluate_selection_mask mask = scope.evaluate(name) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 265, in getitem values = self.evaluate(expression) # , out=self.buffers[variable]) File "/home/project_2022/lib/python3.8/site-packages/vaex/scopes.py", line 188, in evaluate result = eval(expression, expression_namespace, self) File "", line 1, in File "/home/project_2022/lib/python3.8/site-packages/vaex/arrow/numpy_dispatch.py", line 136, in wrapper result = f(args, **kwargs) File "/home/project_2022/lib/python3.8/site-packages/vaex/functions.py", line 2506, in _astype y = x.cast(dtype, safe=False) File "pyarrow/array.pxi", line 816, in pyarrow.lib.Array.cast File "/home/project_2022/lib/python3.8/site-packages/pyarrow/compute.py", line 297, in cast return call_function("cast", [arr], options) File "pyarrow/_compute.pyx", line 527, in pyarrow._compute.call_function File "pyarrow/_compute.pyx", line 337, in pyarrow._compute.Function.call File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Failed to parse string: '144-145' as a scalar of type double

JovanVeljanoski commented 2 years ago

Hi,

First of all, it is probably a bad idea to use vaex together with joblib for parallelization, with vaex, if something can be running in parallel, it is already doing that.. so that can lead to problems.

Looking at the error log, especially the last line pyarrow.lib.ArrowInvalid: Failed to parse string: '144-145' as a scalar of type double it looks like some particular value is invalid, i.e this particular string values can not be interpreted as an integer. So something to check there.

Finally, it would be really helpful if you can come up with a minimal example (with data) that reproduces the problem, so we can run it on our side and try to fix the exact issue you are having. Especially with joins.. things can be quite tricky to pinpoint, so a reproducible example would be great!

maartenbreddels commented 2 years ago

it is probably a bad idea to use vaex together with joblib for parallelization

Not if the GIL is an issue, that's why we use multiprocessing with apply. But I'm not sure it's needed in this example indeed

And indeed, as Jovan said, this seems like a user error. Also, you might want to try to upgrade Vaex, you are using an old version.