rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.27k stars 885 forks source link

[BUG] Multiple DataFrame.loc operations gives confusing error message upon compute on Dask-cuDF #11434

Open alextxu opened 2 years ago

alextxu commented 2 years ago

Describe the bug After creating a Dask-cuDF data frame, if I perform multiple .loc operations on it using boolean Dask-cuDF series, then when I compute the data frame, it produces a runtime error with the message cuDF failure at: ../src/stream_compaction/apply_boolean_mask.cu:73: Column size mismatch. A similar snippet works as expected on cuDF.

Steps/Code to reproduce bug

import dask_cudf
import cudf
ddf1 = dask_cudf.from_cudf(cudf.DataFrame({'a':[1,2,3], 'b':[4,5,6]}), npartitions=2)
f1 = dask_cudf.from_cudf(cudf.Series([False, True, True]), npartitions=2)
f2 = dask_cudf.from_cudf(cudf.Series([True, False]), npartitions=2)
ddf2 = ddf1.loc[f1]
ddf3 = ddf2.loc[f2]
print(ddf2.compute())
print(ddf3.compute())

The above code produces the following output:

   a  b                        
1  2  5
2  3  6                                                       
Traceback (most recent call last):     
  File "temp.py", line 9, in <module>
    print(ddf3.compute())
  File "/opt/conda/lib/python3.8/site-packages/dask/base.py", line 292, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/base.py", line 575, in compute
    results = schedule(dsk, keys, **kwargs)    
  File "/opt/conda/lib/python3.8/site-packages/dask/local.py", line 554, in get_sync
    return get_async(                                         
  File "/opt/conda/lib/python3.8/site-packages/dask/local.py", line 497, in get_async                                       
    for key, res_info, failed in queue_get(queue).result():
  File "/opt/conda/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.8/site-packages/dask/local.py", line 539, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/opt/conda/lib/python3.8/site-packages/dask/local.py", line 235, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/opt/conda/lib/python3.8/site-packages/dask/local.py", line 235, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/opt/conda/lib/python3.8/site-packages/dask/local.py", line 226, in execute_task
    result = pack_exception(e, dumps)
  File "/opt/conda/lib/python3.8/site-packages/dask/local.py", line 221, in execute_task
    result = _execute_task(task, data)
  File "/opt/conda/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/conda/lib/python3.8/site-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/opt/conda/lib/python3.8/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
  File "/opt/conda/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/opt/conda/lib/python3.8/site-packages/dask/utils.py", line 39, in apply
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py", line 6330, in apply_and_enforce
    df = func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/dataframe/methods.py", line 37, in loc
    return df.loc[iindexer]
  File "/opt/conda/lib/python3.8/site-packages/cudf/core/dataframe.py", line 127, in __getitem__
    return self._getitem_tuple_arg(arg)
  File "/opt/conda/lib/python3.8/site-packages/nvtx/nvtx.py", line 101, in inner
    result = func(*args, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/cudf/core/dataframe.py", line 267, in _getitem_tuple_arg
    df = columns_df._apply_boolean_mask(tmp_arg[0])
  File "/opt/conda/lib/python3.8/site-packages/cudf/core/indexed_frame.py", line 1696, in _apply_boolean_mask
    libcudf.stream_compaction.apply_boolean_mask(
  File "cudf/_lib/stream_compaction.pyx", line 101, in cudf._lib.stream_compaction.apply_boolean_mask
RuntimeError: cuDF failure at: ../src/stream_compaction/apply_boolean_mask.cu:73: Column size mismatch

Expected behavior Expected output (verified with cudf instead of dask-cudf):

   a  b
1  2  5
2  3  6
   a  b
1  2  5

Environment overview (please complete the following information)

Environment details cuDF version 22.4.0a0+306.g0cb75a4913

beckernick commented 2 years ago

While cuDF could raise a more informative error rather than leaking internals, this is a Dask issue due to not being able to align the indexes.

We can leave this issue open to track raising a more user-friendly error, but I recommend filing this issue at https://github.com/dask/dask/issues/ for further discussion as changing this behavior in Dask may have some wider implications.

import dask.dataframe as dd
import pandas as pd
ddf1 = dd.from_pandas(pd.DataFrame({'a':[1,2,3], 'b':[4,5,6]}), npartitions=2)
f1 = dd.from_pandas(pd.Series([False, True, True]), npartitions=2)
f2 = dd.from_pandas(pd.Series([True, False]), npartitions=2)
ddf2 = ddf1.loc[f1]
ddf3 = ddf2.loc[f2]
print(ddf2.compute())
print(ddf3.compute())
   a  b
1  2  5
2  3  6

---------------------------------------------------------------------------
IndexingError                             Traceback (most recent call last)
Input In [10], in <cell line: 9>()
      7 ddf3 = ddf2.loc[f2]
      8 print(ddf2.compute())
----> 9 print(ddf3.compute())

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/base.py:312, in DaskMethodsMixin.compute(self, **kwargs)
    288 def compute(self, **kwargs):
    289     """Compute this dask collection
    290 
    291     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    310     dask.base.compute
    311     """
--> 312     (result,) = compute(self, traverse=False, **kwargs)
    313     return result

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597     keys.append(x.__dask_keys__())
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
     78     elif isinstance(pool, multiprocessing.pool.Pool):
     79         pool = MultiprocessingPoolExecutor(pool)
---> 81 results = get_async(
     82     pool.submit,
     83     pool._max_workers,
     84     dsk,
     85     result,
     86     cache=cache,
     87     get_id=_thread_get_id,
     88     pack_exception=pack_exception,
     89     **kwargs,
     90 )
     92 # Cleanup pools associated to dead threads
     93 with pools_lock:

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/local.py:508, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    506         _execute_task(task, data)  # Re-execute locally
    507     else:
--> 508         raise_exception(exc, tb)
    509 res, worker_id = loads(res_info)
    510 state["cache"][key] = res

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/local.py:316, in reraise(exc, tb)
    314 if exc.__traceback__ is not tb:
    315     raise exc.with_traceback(tb)
--> 316 raise exc

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/local.py:221, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    219 try:
    220     task, data = loads(task_info)
--> 221     result = _execute_task(task, data)
    222     id = get_id()
    223     result = dumps((result, id))

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/utils.py:41, in apply(func, args, kwargs)
     39 def apply(func, args, kwargs=None):
     40     if kwargs:
---> 41         return func(*args, **kwargs)
     42     else:
     43         return func(*args)

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/dataframe/core.py:6533, in apply_and_enforce(*args, **kwargs)
   6531 func = kwargs.pop("_func")
   6532 meta = kwargs.pop("_meta")
-> 6533 df = func(*args, **kwargs)
   6534 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   6535     if not len(df):

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/dask/dataframe/methods.py:38, in loc(df, iindexer, cindexer)
     34 """
     35 .loc for known divisions
     36 """
     37 if cindexer is None:
---> 38     return df.loc[iindexer]
     39 else:
     40     return df.loc[iindexer, cindexer]

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/pandas/core/indexing.py:967, in _LocationIndexer.__getitem__(self, key)
    964 axis = self.axis or 0
    966 maybe_callable = com.apply_if_callable(key, self.obj)
--> 967 return self._getitem_axis(maybe_callable, axis=axis)

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/pandas/core/indexing.py:1182, in _LocIndexer._getitem_axis(self, key, axis)
   1180     return self._get_slice_axis(key, axis=axis)
   1181 elif com.is_bool_indexer(key):
-> 1182     return self._getbool_axis(key, axis=axis)
   1183 elif is_list_like_indexer(key):
   1184 
   1185     # an iterable multi-selection
   1186     if not (isinstance(key, tuple) and isinstance(labels, MultiIndex)):

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/pandas/core/indexing.py:984, in _LocationIndexer._getbool_axis(self, key, axis)
    981 def _getbool_axis(self, key, axis: int):
    982     # caller is responsible for ensuring non-None axis
    983     labels = self.obj._get_axis(axis)
--> 984     key = check_bool_indexer(labels, key)
    985     inds = key.nonzero()[0]
    986     return self.obj._take_with_is_copy(inds, axis=axis)

File ~/miniconda3/envs/rapids-22.06/lib/python3.9/site-packages/pandas/core/indexing.py:2383, in check_bool_indexer(index, key)
   2381     mask = isna(result._values)
   2382     if mask.any():
-> 2383         raise IndexingError(
   2384             "Unalignable boolean Series provided as "
   2385             "indexer (index of the boolean Series and of "
   2386             "the indexed object do not match)."
   2387         )
   2388     return result.astype(bool)._values
   2389 if is_object_dtype(key):
   2390     # key might be object-dtype bool, check_array_indexer needs bool array

IndexingError: Unalignable boolean Series provided as indexer (index of the boolean Series and of the indexed object do not match).
alextxu commented 2 years ago

Update: I've realized that the original code snippet I provided should not be expected to produce the output I've given in the expected behavior section. In fact, the code should produce an error, but the error message that Dask-cuDF outputs, namely cuDF failure at: ../src/stream_compaction/apply_boolean_mask.cu:73: Column size mismatch is confusing. On the other hand, the indexing error outputted by Dask with Pandas was more informative and led me to a workaround for my intended use. To summarize, the focus of this issue now is to improve the error message.

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.