dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.6k stars 1.71k forks source link

Some `dt` accessor methods applied to an Index may result in invalid divisions #8813

Open jcrist opened 2 years ago

jcrist commented 2 years ago

Specifically, I found that index.dt.floor(...) can result in invalid divisions if the divisions already align with the period passed to floor. For example:

import dask.dataframe as dd
import pandas as pd

date_index = pd.date_range(start="2022-02-22", freq="16h", periods=12)
df = pd.Series(range(12), index=date_index)
ddf = dd.from_pandas(df, npartitions=3)

res = ddf.index.dt.floor("D")
sol = df.index.floor("D")

dd.assert_eq(res, sol)

Results in

Traceback (most recent call last):
  File "/home/jcristharif/Code/dask/test.py", line 11, in <module>
    dd.assert_eq(res, sol)
  File "/home/jcristharif/Code/dask/dask/dataframe/utils.py", line 537, in assert_eq
    assert_divisions(a, scheduler=scheduler)
  File "/home/jcristharif/Code/dask/dask/dataframe/utils.py", line 616, in assert_divisions
    assert index(df).max() < ddf.divisions[i + 1]
AssertionError

This error indicates that the max of a partition isn't less than the upper bound for that partition as stated by the divisions. This is because the divisions were adjusted by map_partitions (with .floor("D") applied), and that adjustment is non-linear (multiple values all mapped to the same output). ceil is also broken for the same reason, and there may be other dt methods with the same issue.

Action items:

scharlottej13 commented 2 years ago

Noting the relevant discussion on duplicate divisions in #8806