rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.48k stars 906 forks source link

[BUG] Dask cuDF cummax and cummin fail due to missing axis argument in cuDF.Series.where #10104

Open beckernick opened 2 years ago

beckernick commented 2 years ago

Dask cuDF cummax and cummin fail due to missing axis argument in cuDF.Series.where if npartitions > 1.

import cudf
import dask_cudf
​
df = cudf.DataFrame({"x": range(10)})
ddf = dask_cudf.from_cudf(df, 2)
​
ddf.x.cummin().compute()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [1], in <module>
      4 df = cudf.DataFrame({"x": range(10)})
      5 ddf = dask_cudf.from_cudf(df, 2)
----> 7 ddf.x.cummin().compute()

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
    264 def compute(self, **kwargs):
    265     """Compute this dask collection
    266 
    267     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    286     dask.base.compute
    287     """
--> 288     (result,) = compute(self, traverse=False, **kwargs)
    289     return result

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    568     keys.append(x.__dask_keys__())
    569     postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
    572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:553, in get_sync(dsk, keys, **kwargs)
    548 """A naive synchronous version of get_async
    549 
    550 Can be useful for debugging.
    551 """
    552 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 553 return get_async(
    554     synchronous_executor.submit,
    555     synchronous_executor._max_workers,
    556     dsk,
    557     keys,
    558     **kwargs,
    559 )

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:496, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    494 while state["waiting"] or state["ready"] or state["running"]:
    495     fire_tasks(chunksize)
--> 496     for key, res_info, failed in queue_get(queue).result():
    497         if failed:
    498             exc, tb = loads(res_info)

File ~/conda/envs/rapids-22.02/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/conda/envs/rapids-22.02/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:538, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    536 fut = Future()
    537 try:
--> 538     fut.set_result(fn(*args, **kwargs))
    539 except BaseException as e:
    540     fut.set_exception(e)

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:234, in batch_execute_tasks(it)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:234, in <listcomp>(.0)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:225, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223     failed = False
    224 except BaseException as e:
--> 225     result = pack_exception(e, dumps)
    226     failed = True
    227 return key, result, failed

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218 try:
    219     task, data = loads(task_info)
--> 220     result = _execute_task(task, data)
    221     id = get_id()
    222     result = dumps((result, id))

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/dataframe/methods.py:314, in cummin_aggregate(x, y)
    312 def cummin_aggregate(x, y):
    313     if is_series_like(x) or is_dataframe_like(x):
--> 314         return x.where((x < y) | x.isnull(), y, axis=x.ndim - 1)
    315     else:  # scalar
    316         return x if x < y else y

TypeError: where() got an unexpected keyword argument 'axis'
import cudf
import dask_cudf
​
df = cudf.DataFrame({"x": range(10)})
ddf = dask_cudf.from_cudf(df, 2)
​
ddf.x.cummin().compute()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [9], in <module>
      4 df = cudf.DataFrame({"x": range(10)})
      5 ddf = dask_cudf.from_cudf(df, 2)
----> 7 ddf.x.cummin().compute()

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
    264 def compute(self, **kwargs):
    265     """Compute this dask collection
    266 
    267     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    286     dask.base.compute
    287     """
--> 288     (result,) = compute(self, traverse=False, **kwargs)
    289     return result

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    568     keys.append(x.__dask_keys__())
    569     postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
    572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:553, in get_sync(dsk, keys, **kwargs)
    548 """A naive synchronous version of get_async
    549 
    550 Can be useful for debugging.
    551 """
    552 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 553 return get_async(
    554     synchronous_executor.submit,
    555     synchronous_executor._max_workers,
    556     dsk,
    557     keys,
    558     **kwargs,
    559 )

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:496, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    494 while state["waiting"] or state["ready"] or state["running"]:
    495     fire_tasks(chunksize)
--> 496     for key, res_info, failed in queue_get(queue).result():
    497         if failed:
    498             exc, tb = loads(res_info)

File ~/conda/envs/rapids-22.02/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/conda/envs/rapids-22.02/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:538, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    536 fut = Future()
    537 try:
--> 538     fut.set_result(fn(*args, **kwargs))
    539 except BaseException as e:
    540     fut.set_exception(e)

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:234, in batch_execute_tasks(it)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:234, in <listcomp>(.0)
    230 def batch_execute_tasks(it):
    231     """
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:225, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223     failed = False
    224 except BaseException as e:
--> 225     result = pack_exception(e, dumps)
    226     failed = True
    227 return key, result, failed

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218 try:
    219     task, data = loads(task_info)
--> 220     result = _execute_task(task, data)
    221     id = get_id()
    222     result = dumps((result, id))

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/conda/envs/rapids-22.02/lib/python3.8/site-packages/dask/dataframe/methods.py:314, in cummin_aggregate(x, y)
    312 def cummin_aggregate(x, y):
    313     if is_series_like(x) or is_dataframe_like(x):
--> 314         return x.where((x < y) | x.isnull(), y, axis=x.ndim - 1)
    315     else:  # scalar
    316         return x if x < y else y

TypeError: where() got an unexpected keyword argument 'axis'

Env:

!conda list | grep "rapids\|dask" # packages in environment at /home/nicholasb/conda/envs/rapids-22.02: cucim 22.02.00a220121 cuda_11_py38_g0e199fc_37 rapidsai-nightly cudf 22.02.00a220121 cuda_11_py38_g53a31d1b01_303 rapidsai-nightly cudf_kafka 22.02.00a220121 py38_g53a31d1b01_303 rapidsai-nightly cugraph 22.02.00a220119 cuda11_py38_gefbff09a_70 rapidsai-nightly cuml 22.02.00a220118 cuda11_py38_g592834c13_88 rapidsai-nightly cusignal 22.02.00a220121 py39_g80eadba_11 rapidsai-nightly cuspatial 22.02.00a220121 py38_g5d619e7_19 rapidsai-nightly custreamz 22.02.00a220121 py38_g53a31d1b01_303 rapidsai-nightly cuxfilter 22.02.00a220121 py38_g4287bac_13 rapidsai-nightly dask 2021.11.2 pyhd8ed1ab_0 conda-forge dask-core 2021.11.2 pyhd8ed1ab_0 conda-forge dask-cuda 22.02.00a220121 py38_49 rapidsai-nightly dask-cudf 22.02.00a220121 cuda_11_py38_g53a31d1b01_303 rapidsai-nightly libcucim 22.02.00a220121 cuda11_g0e199fc_37 rapidsai-nightly libcudf 22.02.00a220121 cuda11_g53a31d1b01_303 rapidsai-nightly libcudf_kafka 22.02.00a220121 g53a31d1b01_303 rapidsai-nightly libcugraph 22.02.00a220119 cuda11_gefbff09a_70 rapidsai-nightly libcugraph_etl 22.02.00a220121 cuda11_gc0096791_73 rapidsai-nightly libcuml 22.02.00a220118 cuda11_g592834c13_88 rapidsai-nightly libcumlprims 22.02.00a220119 cuda11_g0342bdb_15 rapidsai-nightly libcuspatial 22.02.00a220121 cuda11_g5d619e7_19 rapidsai-nightly librmm 22.02.00a220121 cuda11_g30eb83b_31 rapidsai-nightly libxgboost 1.5.0dev.rapidsai22.02 cuda11.2_0 rapidsai-nightly ptxcompiler 0.2.0 py38hb739d79_0 rapidsai-nightly py-xgboost 1.5.0dev.rapidsai22.02 cuda11.2py38_0 rapidsai-nightly pylibcugraph 22.02.00a220119 cuda11_py38_gefbff09a_70 rapidsai-nightly rapids 22.02.00a220118 cuda11_py38_g3986715_134 rapidsai-nightly rapids-xgboost 22.02.00a220118 cuda11_py38_g3986715_134 rapidsai-nightly rmm 22.02.00a220121 cuda11_py38_g30eb83b_31_has_cma rapidsai-nightly ucx 1.12.0+gd367332 cuda11.2_0 rapidsai-nightly ucx-proc 1.0.0 gpu rapidsai-nightly ucx-py 0.24.0a220121 py38_gd367332_26 rapidsai-nightly xgboost 1.5.0dev.rapidsai22.02 cuda11.2py38_0 rapidsai-nightly
github-actions[bot] commented 2 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-90d due to no recent activity in the past 90 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.