pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.5k stars 1.04k forks source link

Resample fails when there is a gap in the time dimension when using dask arrays #8592

Closed joaomacalos closed 6 months ago

joaomacalos commented 6 months ago

What happened?

I'm trying to resample a dask dataset that normally has few observations per day but sometimes skips a day. It used to work in previous version of xarray / numpy, but now I'm receiving this error:

TypeError: indices must be integral: the provided empty sequence was inferred as float. Wrap it with 'np.array(indices, dtype=np.intp)

What did you expect to happen?

I expected that it would create the missing date on the fly and fill it with a np.nan, as it does when the dask is not used.

Minimal Complete Verifiable Example

# Mock data:
import xarray as xr
import pandas as pd
time_coords = pd.to_datetime(['2018-06-13T03:40:36', '2018-06-13T05:50:37',
       '2018-06-15T03:02:34'])

latitude_coords = [0.0]
longitude_coords = [0.0]

data = [[[1.0]], [[2.0]], [[3.0]]]

da = xr.DataArray(
    data,
    coords={'time': time_coords, 'latitude': latitude_coords, 'longitude': longitude_coords},
    dims=['time', 'latitude', 'longitude']
)

# Without chunking the dataarray, it works:
da.resample(time="1D").mean()

# But when we add the chunks, it fails:
da.chunk({"time": 1}).resample(time="1D").mean()

MVCE confirmation

Relevant log output

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[118], line 15
      7 data = [[[1.0]], [[2.0]], [[3.0]]]
      9 da = xr.DataArray(
     10     data,
     11     coords={'time': time_coords, 'latitude': latitude_coords, 'longitude': longitude_coords},
     12     dims=['time', 'latitude', 'longitude']
     13 )
---> 15 da.chunk({"time": 1}).resample(time="1D").mean()

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/xarray/core/_aggregations.py:7350, in DataArrayResampleAggregations.mean(self, dim, skipna, keep_attrs, **kwargs)
   7266 """
   7267 Reduce this DataArray's data by applying ``mean`` along some dimension(s).
   7268 
   (...)
   7343   * time     (time) datetime64[ns] 2001-01-31 2001-04-30 2001-07-31
   7344 """
   7345 if (
   7346     flox_available
   7347     and OPTIONS["use_flox"]
   7348     and contains_only_chunked_or_numpy(self._obj)
   7349 ):
-> 7350     return self._flox_reduce(
   7351         func="mean",
   7352         dim=dim,
   7353         skipna=skipna,
   7354         # fill_value=fill_value,
   7355         keep_attrs=keep_attrs,
   7356         **kwargs,
   7357     )
   7358 else:
   7359     return self.reduce(
   7360         duck_array_ops.mean,
   7361         dim=dim,
   (...)
   7364         **kwargs,
   7365     )

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/xarray/core/resample.py:57, in Resample._flox_reduce(self, dim, keep_attrs, **kwargs)
     51 def _flox_reduce(
     52     self,
     53     dim: Dims,
     54     keep_attrs: bool | None = None,
     55     **kwargs,
     56 ) -> T_Xarray:
---> 57     result = super()._flox_reduce(dim=dim, keep_attrs=keep_attrs, **kwargs)
     58     result = result.rename({RESAMPLE_DIM: self._group_dim})
     59     return result

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/xarray/core/groupby.py:1039, in GroupBy._flox_reduce(self, dim, keep_attrs, **kwargs)
   1036     kwargs.setdefault("min_count", 1)
   1038 output_index = grouper.full_index
-> 1039 result = xarray_reduce(
   1040     obj.drop_vars(non_numeric.keys()),
   1041     self._codes,
   1042     dim=parsed_dim,
   1043     # pass RangeIndex as a hint to flox that `by` is already factorized
   1044     expected_groups=(pd.RangeIndex(len(output_index)),),
   1045     isbin=False,
   1046     keep_attrs=keep_attrs,
   1047     **kwargs,
   1048 )
   1050 # we did end up reducing over dimension(s) that are
   1051 # in the grouped variable
   1052 group_dims = grouper.group.dims

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/flox/xarray.py:417, in xarray_reduce(obj, func, expected_groups, isbin, sort, dim, fill_value, dtype, method, engine, keep_attrs, skipna, min_count, reindex, *by, **finalize_kwargs)
    415 output_core_dims = [d for d in input_core_dims[0] if d not in dim_tuple]
    416 output_core_dims.extend(group_names)
--> 417 actual = xr.apply_ufunc(
    418     wrapper,
    419     ds_broad.drop_vars(tuple(missing_dim)).transpose(..., *grouper_dims),
    420     *by_da,
    421     input_core_dims=input_core_dims,
    422     # for xarray's test_groupby_duplicate_coordinate_labels
    423     exclude_dims=set(dim_tuple),
    424     output_core_dims=[output_core_dims],
    425     dask="allowed",
    426     dask_gufunc_kwargs=dict(
    427         output_sizes=group_sizes, output_dtypes=[dtype] if dtype is not None else None
    428     ),
    429     keep_attrs=keep_attrs,
    430     kwargs={
    431         "func": func,
    432         "axis": axis,
    433         "sort": sort,
    434         "fill_value": fill_value,
    435         "method": method,
    436         "min_count": min_count,
    437         "skipna": skipna,
    438         "engine": engine,
    439         "reindex": reindex,
    440         "expected_groups": tuple(expected_groups_valid_list),
    441         "isbin": isbins,
    442         "finalize_kwargs": finalize_kwargs,
    443         "dtype": dtype,
    444         "core_dims": input_core_dims,
    445     },
    446 )
    448 # restore non-dim coord variables without the core dimension
    449 # TODO: shouldn't apply_ufunc handle this?
    450 for var in set(ds_broad._coord_names) - set(ds_broad._indexes) - set(ds_broad.dims):

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/xarray/core/computation.py:1254, in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, meta, dask_gufunc_kwargs, on_missing_core_dim, *args)
   1252 # feed datasets apply_variable_ufunc through apply_dataset_vfunc
   1253 elif any(is_dict_like(a) for a in args):
-> 1254     return apply_dataset_vfunc(
   1255         variables_vfunc,
   1256         *args,
   1257         signature=signature,
   1258         join=join,
   1259         exclude_dims=exclude_dims,
   1260         dataset_join=dataset_join,
   1261         fill_value=dataset_fill_value,
   1262         keep_attrs=keep_attrs,
   1263         on_missing_core_dim=on_missing_core_dim,
   1264     )
   1265 # feed DataArray apply_variable_ufunc through apply_dataarray_vfunc
   1266 elif any(isinstance(a, DataArray) for a in args):

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/xarray/core/computation.py:531, in apply_dataset_vfunc(func, signature, join, dataset_join, fill_value, exclude_dims, keep_attrs, on_missing_core_dim, *args)
    526 list_of_coords, list_of_indexes = build_output_coords_and_indexes(
    527     args, signature, exclude_dims, combine_attrs=keep_attrs
    528 )
    529 args = tuple(getattr(arg, "data_vars", arg) for arg in args)
--> 531 result_vars = apply_dict_of_variables_vfunc(
    532     func,
    533     *args,
    534     signature=signature,
    535     join=dataset_join,
    536     fill_value=fill_value,
    537     on_missing_core_dim=on_missing_core_dim,
    538 )
    540 out: Dataset | tuple[Dataset, ...]
    541 if signature.num_outputs > 1:

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/xarray/core/computation.py:455, in apply_dict_of_variables_vfunc(func, signature, join, fill_value, on_missing_core_dim, *args)
    453 core_dim_present = _check_core_dims(signature, variable_args, name)
    454 if core_dim_present is True:
--> 455     result_vars[name] = func(*variable_args)
    456 else:
    457     if on_missing_core_dim == "raise":

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/xarray/core/computation.py:822, in apply_variable_ufunc(func, signature, exclude_dims, dask, output_dtypes, vectorize, keep_attrs, dask_gufunc_kwargs, *args)
    817     if vectorize:
    818         func = _vectorize(
    819             func, signature, output_dtypes=output_dtypes, exclude_dims=exclude_dims
    820         )
--> 822 result_data = func(*input_data)
    824 if signature.num_outputs == 1:
    825     result_data = (result_data,)

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/flox/xarray.py:388, in xarray_reduce.<locals>.wrapper(array, func, skipna, core_dims, *by, **kwargs)
    385         offset = array.min()
    386         array = datetime_to_numeric(array, offset, datetime_unit="us")
--> 388 result, *groups = groupby_reduce(array, *by, func=func, **kwargs)
    390 # Output of count has an int dtype.
    391 if requires_numeric and func != "count":

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/flox/core.py:2204, in groupby_reduce(array, func, expected_groups, sort, isbin, axis, fill_value, dtype, min_count, method, engine, reindex, finalize_kwargs, *by)
   2201 if method == "blockwise" and by_.ndim == 1:
   2202     array = rechunk_for_blockwise(array, axis=-1, labels=by_)
-> 2204 result, groups = partial_agg(
   2205     array,
   2206     by_,
   2207     expected_groups=expected_groups,
   2208     agg=agg,
   2209     reindex=reindex,
   2210     method=method,
   2211     sort=sort,
   2212 )
   2214 if sort and method != "map-reduce":
   2215     assert len(groups) == 1

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/flox/core.py:1557, in dask_groupby_agg(array, by, agg, expected_groups, axis, fill_value, method, reindex, engine, sort, chunks_cohorts)
   1555 for blks, cohort in chunks_cohorts.items():
   1556     index = pd.Index(cohort)
-> 1557     subset = subset_to_blocks(intermediate, blks, array.blocks.shape[-len(axis) :])
   1558     reindexed = dask.array.map_blocks(
   1559         reindex_intermediates, subset, agg, index, meta=subset._meta
   1560     )
   1561     # now that we have reindexed, we can set reindex=True explicitlly

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/flox/core.py:1355, in subset_to_blocks(array, flatblocks, blkshape)
   1352 if blkshape is None:
   1353     blkshape = array.blocks.shape
-> 1355 index = _normalize_indexes(array, flatblocks, blkshape)
   1357 if all(not isinstance(i, np.ndarray) and i == slice(None) for i in index):
   1358     return array

File ~/micromamba/envs/test-env/lib/python3.11/site-packages/flox/core.py:1303, in _normalize_indexes(array, flatblocks, blkshape)
   1293 def _normalize_indexes(array: DaskArray, flatblocks, blkshape) -> tuple:
   1294     """
   1295     .blocks accessor can only accept one iterable at a time,
   1296     but can handle multiple slices.
   (...)
   1301     TODO: move this upstream
   1302     """
-> 1303     unraveled = np.unravel_index(flatblocks, blkshape)
   1305     normalized: list[int | slice | list[int]] = []
   1306     for ax, idx in enumerate(unraveled):

TypeError: indices must be integral: the provided empty sequence was inferred as float. Wrap it with 'np.array(indices, dtype=np.intp)'

Anything else we need to know?

In this version of the package the code works just fine:

INSTALLED VERSIONS
------------------
commit: None
python: 3.11.6 | packaged by conda-forge | (main, Oct  3 2023, 10:40:35) [GCC 12.3.0]
python-bits: 64
OS: Linux
OS-release: 4.14.326-245.539.amzn2.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: ('en_US', 'UTF-8')
libhdf5: 1.14.2
libnetcdf: None

xarray: 2023.10.1
pandas: 2.1.1
numpy: 1.26.0
scipy: 1.11.3
netCDF4: None
pydap: None
h5netcdf: 1.2.0
h5py: 3.10.0
Nio: None
zarr: 2.16.1
cftime: 1.6.2
nc_time_axis: None
PseudoNetCDF: None
iris: None
bottleneck: 1.3.7
dask: 2023.10.0
distributed: 2023.10.0
matplotlib: 3.8.0
cartopy: None
seaborn: None
numbagg: None
fsspec: 2023.10.0
cupy: None
pint: None
sparse: None
flox: 0.8.1
numpy_groupies: 0.10.2
setuptools: 68.2.2
pip: 23.3.1
conda: None
pytest: 7.4.3
mypy: None
IPython: 8.16.1
sphinx: None

Environment

INSTALLED VERSIONS ------------------ commit: None python: 3.11.7 | packaged by conda-forge | (main, Dec 23 2023, 14:43:09) [GCC 12.3.0] python-bits: 64 OS: Linux OS-release: 4.14.326-245.539.amzn2.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: ('en_US', 'UTF-8') libhdf5: None libnetcdf: None xarray: 2023.12.0 pandas: 2.1.4 numpy: 1.26.3 scipy: 1.11.4 netCDF4: None pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.16.1 cftime: None nc_time_axis: None iris: None bottleneck: 1.3.7 dask: 2023.12.1 distributed: 2023.12.1 matplotlib: 3.8.2 cartopy: None seaborn: None numbagg: None fsspec: 2023.12.2 cupy: None pint: None sparse: None flox: 0.8.5 numpy_groupies: 0.10.2 setuptools: 69.0.3 pip: 23.3.2 conda: None pytest: None mypy: None IPython: 8.19.0 sphinx: None
welcome[bot] commented 6 months ago

Thanks for opening your first issue here at xarray! Be sure to follow the issue template! If you have an idea for a solution, we would really welcome a Pull Request with proposed changes. See the Contributing Guide for more. It may take us a while to respond here, but we really value your contribution. Contributors like you help make xarray better. Thank you!

dcherian commented 6 months ago

This is fixed on flox v0.8.6.