dask / dask-expr

BSD 3-Clause "New" or "Revised" License
86 stars 26 forks source link

Missing resample() implementation for grouped data frame #1134

Open dbalabka opened 2 months ago

dbalabka commented 2 months ago

Describe the issue:

import pandas as pd
import dask.dataframe as dd

# Create a sample DataFrame with 'id' and 'date' columns
data = {
    'id': [1, 1, 1, 2, 2, 2],
    'date': pd.to_datetime(['2023-01-01', '2023-01-04', '2023-01-05', '2023-01-01', '2023-01-04', '2023-01-05'])
}
df = dd.DataFrame(data)

# Group by 'id' and resample to daily frequency
result = df.groupby(by=['id']).resample("D", on="date").compute()

# Print the resulting DataFrame
print(result)
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/src/.venv/lib/python3.10/site-packages/dask_expr/_groupby.py:1593, in GroupBy.__getattr__(self, key)
   1592 try:
-> 1593     return self[key]
   1594 except KeyError as e:

File ~/src/.venv/lib/python3.10/site-packages/dask_expr/_groupby.py:1615, in GroupBy.__getitem__(self, key)
   1614 if is_scalar(key):
-> 1615     return SeriesGroupBy(
   1616         self.obj,
   1617         by=self.by,
   1618         slice=key,
   1619         sort=self.sort,
   1620         dropna=self.dropna,
   1621         observed=self.observed,
   1622     )
   1623 g = GroupBy(
   1624     self.obj,
   1625     by=self.by,
   (...)
   1630     group_keys=self.group_keys,
   1631 )

File ~/src/.venv/lib/python3.10/site-packages/dask_expr/_groupby.py:2214, in SeriesGroupBy.__init__(self, obj, by, sort, observed, dropna, slice)
   2212         obj._meta.groupby(by, **_as_dict("observed", observed))
-> 2214 super().__init__(
   2215     obj, by=by, slice=slice, observed=observed, dropna=dropna, sort=sort
   2216 )

File ~/src/.venv/lib/python3.10/site-packages/dask_expr/_groupby.py:1558, in GroupBy.__init__(self, obj, by, group_keys, sort, observed, dropna, slice)
   1557     slice = list(slice)
-> 1558 self._meta = self._meta[slice]

File ~/src/.venv/lib/python3.10/site-packages/pandas/core/groupby/generic.py:1951, in DataFrameGroupBy.__getitem__(self, key)
   1947     raise ValueError(
   1948         "Cannot subset columns with a tuple with more than one element. "
   1949         "Use a list instead."
   1950     )
-> 1951 return super().__getitem__(key)

File ~/src/.venv/lib/python3.10/site-packages/pandas/core/base.py:244, in SelectionMixin.__getitem__(self, key)
    243 if key not in self.obj:
--> 244     raise KeyError(f"Column not found: {key}")
    245 ndim = self.obj[key].ndim

KeyError: 'Column not found: resample'

Minimal Complete Verifiable Example:

# Put your MCVE code here

Environment:

phofl commented 2 months ago

Thanks for the report.

I think resample isn't implemented for groupby in Dask. The error message could certainly be better, adding it would also be fine

dbalabka commented 2 months ago

@phofl , thanks for the quick reply. Is there any workaround to run arbitrary Pandas functions on groups, like map_partition? Due to the group being distributed over a cluster, I need to make something smarter. The resample needs to have all rows in one partition to fill the gaps.

phofl commented 2 months ago

Do you want the whole group in a single partition? If yes, you can use groupby.apply / groupby.transform

dbalabka commented 2 months ago

@phofl, it works, thanks:

import pandas as pd
import dask.dataframe as dd
data = {
    'id': [1, 1, 1, 2, 2, 2],
    'date': pd.to_datetime(['2023-01-01', '2023-01-04', '2023-01-05', '2023-01-01', '2023-01-04', '2023-01-05']),
    'metric': [1,1,1,1,1,1]
}
df = dd.from_pandas(pd.DataFrame(data).astype({'id': 'int64[pyarrow]', 'metric': 'int64[pyarrow]', 'date': 'timestamp[ns][pyarrow]'}))

print(
    df
    .groupby(by=['id'])
    .apply(lambda x: x.resample("D", on="date").sum(), include_groups=False, meta={"metric": "int64[pyarrow]"})
    .reset_index()
)
dbalabka commented 2 months ago

FYI, for those who came across this ticket. It was a bit unexpected for me that Dask keeps one group in a single partition, which means we can lead to OOM if a group is too large, and we should keep it in mind while grouping. Otherwise, we should do this: https://stackoverflow.com/a/55881591/983577

phofl commented 2 months ago

Apply and transform are doing this specifically, there is no way around that fwiw

dbalabka commented 2 months ago

@phofl , I've spent a lot of time working around the missing resample method. Found another bug that prevents me from using apply on groups: https://github.com/dask/dask/issues/11394

phofl commented 2 months ago

that's odd, thanks for digging these up, I'll try to take a look tomorrow

dbalabka commented 2 months ago

@phofl, it might be related to a bug in pandas that I spoted during this investigation: https://github.com/pandas-dev/pandas/issues/59823 Pandas produces the empty column name that can affect Dask logic

dbalabka commented 2 months ago

Here is a work around that only work in my case:

import pandas as pd
import dask.dataframe as dd
data = {
    'id': [1, 1, 1, 2, 2, 2],
    'date': pd.to_datetime(['2023-01-01', '2023-01-04', '2023-01-05', '2023-01-01', '2023-01-04', '2023-01-05']),
    'metric': [1,1,1,1,1,1]
}
df = dd.from_pandas(pd.DataFrame(data).astype({'id': 'int64[pyarrow]', 'metric': 'int64[pyarrow]', 'date': 'timestamp[ns][pyarrow]'}))

print(
    df
    # Partitioning by id as a replacement for groupby
    .set_index('id')
    # See bug: https://github.com/pandas-dev/pandas/issues/59823
    .astype({'date': 'datetime64[ns]'})
    # Apply the required Pandas function on each partition. Previously, set index guarantee us that each partition has all required rows
    .map_partitions(
        lambda x: x.groupby('id').resample("D", on="date").sum().reset_index(), 
        meta={'id': 'int64[pyarrow]', 'date': 'timestamp[ns][pyarrow]', 'metric': 'int64[pyarrow]'},
    )
    # Remove unnecessary index
    .reset_index(drop=True)
    .compute()
)