dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.6k stars 1.71k forks source link

add mode example to custom aggregation #6329

Open raybellwaves opened 4 years ago

raybellwaves commented 4 years ago

There's an example of a mode custom aggregation on SO https://stackoverflow.com/a/46082075/6046019

Do you think it would be of benefit to add it the docs as another example at the bottom of https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate?

TomAugspurger commented 4 years ago

That could be added, with a caveat about how it breaks ties when there are multiple modes.

On Tue, Jun 16, 2020 at 10:53 PM Ray Bell notifications@github.com wrote:

There's an example of a mode custom aggregation on SO https://stackoverflow.com/a/46082075/6046019

Do you think it would be of benefit to add it the docs as another example at the bottom of https://docs.dask.org/en/latest/dataframe-groupby.html#aggregate?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask/issues/6329, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAKAOISEFWW356OEVNUMUR3RXA45FANCNFSM4OAGNRVQ .

raybellwaves commented 4 years ago

That could be added, with a caveat about how it breaks ties when there are multiple modes.

Interesting. I see that in pandas as well.

Note: the code below is WIP.


import pandas as pd
import pandas._testing as tm
import dask.dataframe as dd 

d = {'col1': [1, 1, 1, 1, 1], 'col2': [5, 6, 6, 6, 7]} # Works
#d = {'col1': [1, 1, 1, 1, 1], 'col2': [5, 5, 6, 6, 7]} # Does not work
df = pd.DataFrame(data=d)
expected = df.groupby('col1').agg({'col2':pd.Series.mode})

ddf = dd.from_pandas(df, npartitions=2)

mode = dd.Aggregation(
    name="mode",
    chunk=lambda s: s.value_counts(),
    agg=lambda s0: s0._selected_obj.groupby(
        level=list(range(s0._selected_obj.index.nlevels))
    ).sum(),
    finalize=lambda s1: s1.groupby(level=list(range(s1.index.nlevels - 1))).apply(
        lambda x: x.reset_index(
            level=list(range(x.index.nlevels - 1)), drop=True
        ).argmax()
    ),
)

actual = ddf.groupby('col1').agg({'col2':mode}).compute()

def chunk(s):
    return s.value_counts()

def agg(s):
    return s.apply(lambda s: s.groupby(level=-1).sum())

def finalize(s):
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).argmax())
    )

mode = dd.Aggregation('mode', chunk, agg, finalize)

actual = ddf.groupby('col1').agg({'col2':mode}).compute()
ncclementi commented 3 years ago

@raybellwaves Are you still interested in adding the example above or some version of it? I'm noticing there are no PR's linked to this issue.

zmbc commented 1 year ago

For what it's worth: I could not get the above code to work, and spent a bit of time debugging it before realizing that .argmax in the finalize function is getting the integer position of the most common value, not the value itself; replacing with .idxmax works.

SultanOrazbayev commented 10 months ago

Slightly modified code that also allows for multiple values of the mode (as seen on StackOverflow):

```python from pandas import DataFrame, Series, NA from dask.dataframe import from_pandas, Aggregation data = DataFrame( { "status": [ "pending", "pending", "pending", "canceled", "canceled", "canceled", "confirmed", "confirmed", "confirmed", ], "clientId": ["A", "B", "C", "A", "D", "C", "A", "B", "C"], "partner": ["A", NA, "C", "A", NA, "C", "A", NA, "C"], "product": [ "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard", ], "brand": [ "brand_4", "brand_2", "brand_3", "brand_1", "brand_2", "brand_3", "brand_1", "brand_3", "brand_3", ], "gmv": [100, 100, 100, 100, 100, 100, 100, 100, 100], } ) data = data.astype( { "partner": "category", "status": "category", "product": "category", "brand": "category", } ) mode_pandas = data.groupby(["clientId", "product"], observed=True).agg( {"brand": Series.mode} ) df = from_pandas(data, npartitions=1) def chunk(s): return s.value_counts() def agg(s0): _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum() _intermediate = _intermediate[_intermediate > 0] return _intermediate def finalize(s): level = list(range(s.index.nlevels - 1)) return s.groupby(level=level, group_keys=False).apply(lambda s: s[s == s.max()]) mode = Aggregation( name="mode", chunk=chunk, agg=agg, finalize=finalize, ) mode_dask = df.groupby(["clientId", "product"], observed=True, dropna=True).aggregate( {"brand": mode} ).compute() print(mode_pandas) print(mode_dask) ```
hendrikmakait commented 10 months ago

@SultanOrazbayev: Thanks for providing this example! Would you be interested in contributing a PR that adds the example or a simplified version to the Dask documentation?