rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.42k stars 901 forks source link

[BUG] Some custom dask aggregations fail with dask_cudf dataframes #11515

Open ChrisJar opened 2 years ago

ChrisJar commented 2 years ago

Describe the bug Some dask custom aggregations (ex: a custom sum of squares aggregation) fail with dask_cudf.

Steps/Code to reproduce bug

import cudf
import dask_cudf
import dask.dataframe as dd

df = cudf.DataFrame({"a":[1,2,3], "b":[1,1,2]})
ddf = dask_cudf.from_cudf(df, npartitions=1)

sum_of_squares = dd.Aggregation(
    name='sum_of_squares',
    chunk=lambda s: s.agg(lambda x: (x**2).sum()),
    agg=lambda s0: s0.sum()
)

ddf.groupby("b").agg(sum_of_squares).compute()

returns:

TypeError: unsupported operand type(s) for ** or pow(): 'type' and 'int'

however when running with a pandas backed dask dataframe:

ddf.to_dask_dataframe().groupby("b").agg(sum_of_squares).compute()

the expected result is returned:

   a
b   
1  5
2  9
​

cc: @randerzander

shwina commented 2 years ago

This fundamentally comes down to cuDF not supporting custom aggregations, e.g.,:

>>> df.groupby('b').agg(lambda x: (x**2).sum())
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [21], in <cell line: 1>()
----> 1 df.groupby('b').agg(lambda x: (x**2).sum())

File /raid/ashwint/cudf-dev/lib/python3.9/contextlib.py:79, in ContextDecorator.__call__.<locals>.inner(*args, **kwds)
     76 @wraps(func)
     77 def inner(*args, **kwds):
     78     with self._recreate_cm():
---> 79         return func(*args, **kwds)

File /raid/ashwint/cudf-dev/lib/python3.9/site-packages/cudf/core/groupby/groupby.py:316, in GroupBy.agg(self, func)
    307 column_names, columns, normalized_aggs = self._normalize_aggs(func)
    309 # Note: When there are no key columns, the below produces
    310 # a Float64Index, while Pandas returns an Int64Index
    311 # (GH: 6945)
    312 (
    313     result_columns,
    314     grouped_key_cols,
    315     included_aggregations,
--> 316 ) = self._groupby.aggregate(columns, normalized_aggs)
    318 result_index = self.grouping.keys._from_columns_like_self(
    319     grouped_key_cols,
    320 )
    322 multilevel = _is_multi_agg(func)

File groupby.pyx:295, in cudf._lib.groupby.GroupBy.aggregate()

File groupby.pyx:170, in cudf._lib.groupby.GroupBy.aggregate_internal()

File aggregation.pyx:873, in cudf._lib.aggregation.make_groupby_aggregation()

Input In [21], in <lambda>(x)
----> 1 df.groupby('b').agg(lambda x: (x**2).sum())

TypeError: unsupported operand type(s) for ** or pow(): 'type' and 'int'
ChrisJar commented 2 years ago

Ah I see. Is there any other way to square the elements of a SeriesGroupBy object?

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

GregoryKimball commented 2 years ago

Would this be possible with apply instead of agg? Is there an extension of #11452 that could accept some custom aggregations?