monocongo / climate_indices

Climate indices for drought monitoring
https://monocongo.github.io/climate_indices/
Other
338 stars 163 forks source link

ValueError: Invalid periodicity argument: monthly using dask #558

Open Queeno11 opened 3 weeks ago

Queeno11 commented 3 weeks ago

Hi everyone! Thanks a lot for all the work done here, it's really helpful for many of us working with climate data.

Describe the bug I get ValueError: Invalid periodicity argument: monthly using dask when trying to compute the SPI using ERA5-Land data. The weird thing here is that, when loading the xarray in memory the code works perfectly, but when using a chunked xarray.DataSet I get that error.

To Reproduce

import xarray as xr
from climate_indices import indices, compute

precipitation = xr.open_dataset(
    rf"ERA5-Land_monthly_1970-2021_subset.nc", 
    chunks={"time":-1, "lat": 100, "lon": 100}
)

# Filter between 1970 and 1990 to reduce size
precipitation = precipitation.sel(time=slice("1970", "1972"))

da_precip_groupby = precipitation["tp"].stack(point=("lat", "lon")).groupby(group="point")

distribution = indices.Distribution.gamma
data_start_year = 1970
calibration_year_initial = 1970
calibration_year_final = 1972
periodicity = compute.Periodicity.monthly

da_spi = xr.apply_ufunc(
    indices.spi,
    da_precip_groupby,
    i,
    distribution,
    data_start_year,
    calibration_year_initial,
    calibration_year_final,
    periodicity,
    dask="parallelized",
)
da_spi = da_spi.unstack("point").rename(f"spi{i}")
da_spi.compute()

Which returns:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[24], [line 32](vscode-notebook-cell:?execution_count=24&line=32)
     [20](vscode-notebook-cell:?execution_count=24&line=20) da_spi = xr.apply_ufunc(
     [21](vscode-notebook-cell:?execution_count=24&line=21)     indices.spi,
     [22](vscode-notebook-cell:?execution_count=24&line=22)     da_precip_groupby,
   (...)
     [29](vscode-notebook-cell:?execution_count=24&line=29)     dask="parallelized",
     [30](vscode-notebook-cell:?execution_count=24&line=30) )
     [31](vscode-notebook-cell:?execution_count=24&line=31) da_spi = da_spi.unstack("point").rename(f"spi{i}")
---> [32](vscode-notebook-cell:?execution_count=24&line=32) da_spi.compute()

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\xarray\core\dataarray.py:1101, in DataArray.compute(self, **kwargs)
   [1082](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1082) """Manually trigger loading of this array's data from disk or a
   [1083](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1083) remote source into memory and return a new array. The original is
   [1084](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1084) left unaltered.
   (...)
   [1098](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1098) dask.compute
   [1099](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1099) """
   [1100](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1100) new = self.copy(deep=False)
-> [1101](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1101) return new.load(**kwargs)

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\xarray\core\dataarray.py:1075, in DataArray.load(self, **kwargs)
   [1057](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1057) def load(self: T_DataArray, **kwargs) -> T_DataArray:
   [1058](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1058)     """Manually trigger loading of this array's data from disk or a
   [1059](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1059)     remote source into memory and return this array.
   [1060](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1060) 
   (...)
   [1073](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1073)     dask.compute
   [1074](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1074)     """
-> [1075](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1075)     ds = self._to_temp_dataset().load(**kwargs)
   [1076](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1076)     new = self._from_temp_dataset(ds)
   [1077](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataarray.py:1077)     self._variable = new._variable

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\xarray\core\dataset.py:747, in Dataset.load(self, **kwargs)
    [744](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataset.py:744) import dask.array as da
    [746](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataset.py:746) # evaluate all the dask arrays simultaneously
--> [747](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataset.py:747) evaluated_data = da.compute(*lazy_data.values(), **kwargs)
    [749](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataset.py:749) for k, data in zip(lazy_data, evaluated_data):
    [750](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/xarray/core/dataset.py:750)     self.variables[k].data = data

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\base.py:573, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    [570](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/base.py:570)     keys.append(x.__dask_keys__())
    [571](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/base.py:571)     postcomputes.append(x.__dask_postcompute__())
--> [573](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/base.py:573) results = schedule(dsk, keys, **kwargs)
    [574](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/base.py:574) return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
     [78](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:78)     elif isinstance(pool, multiprocessing.pool.Pool):
     [79](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:79)         pool = MultiprocessingPoolExecutor(pool)
---> [81](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:81) results = get_async(
     [82](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:82)     pool.submit,
     [83](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:83)     pool._max_workers,
     [84](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:84)     dsk,
     [85](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:85)     result,
     [86](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:86)     cache=cache,
     [87](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:87)     get_id=_thread_get_id,
     [88](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:88)     pack_exception=pack_exception,
     [89](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:89)     **kwargs,
     [90](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:90) )
     [92](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:92) # Cleanup pools associated to dead threads
     [93](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/threaded.py:93) with pools_lock:

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\local.py:506, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    [504](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:504)         _execute_task(task, data)  # Re-execute locally
    [505](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:505)     else:
--> [506](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:506)         raise_exception(exc, tb)
    [507](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:507) res, worker_id = loads(res_info)
    [508](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:508) state["cache"][key] = res

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\local.py:314, in reraise(exc, tb)
    [312](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:312) if exc.__traceback__ is not tb:
    [313](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:313)     raise exc.with_traceback(tb)
--> [314](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:314) raise exc

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\local.py:219, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    [217](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:217) try:
    [218](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:218)     task, data = loads(task_info)
--> [219](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:219)     result = _execute_task(task, data)
    [220](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:220)     id = get_id()
    [221](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/local.py:221)     result = dumps((result, id))

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:119, in _execute_task(arg, cache, dsk)
    [115](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:115)     func, args = arg[0], arg[1:]
    [116](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:116)     # Note: Don't assign the subtask results to a variable. numpy detects
    [117](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:117)     # temporaries by their reference count and can execute certain
    [118](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:118)     # operations in-place.
--> [119](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:119)     return func(*(_execute_task(a, cache) for a in args))
    [120](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:120) elif not ishashable(arg):
    [121](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:121)     return arg

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:119, in <genexpr>(.0)
    [115](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:115)     func, args = arg[0], arg[1:]
    [116](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:116)     # Note: Don't assign the subtask results to a variable. numpy detects
    [117](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:117)     # temporaries by their reference count and can execute certain
    [118](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:118)     # operations in-place.
--> [119](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:119)     return func(*(_execute_task(a, cache) for a in args))
    [120](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:120) elif not ishashable(arg):
    [121](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:121)     return arg

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:119, in _execute_task(arg, cache, dsk)
    [115](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:115)     func, args = arg[0], arg[1:]
    [116](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:116)     # Note: Don't assign the subtask results to a variable. numpy detects
    [117](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:117)     # temporaries by their reference count and can execute certain
    [118](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:118)     # operations in-place.
--> [119](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:119)     return func(*(_execute_task(a, cache) for a in args))
    [120](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:120) elif not ishashable(arg):
    [121](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:121)     return arg

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:119, in <genexpr>(.0)
    [115](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:115)     func, args = arg[0], arg[1:]
    [116](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:116)     # Note: Don't assign the subtask results to a variable. numpy detects
    [117](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:117)     # temporaries by their reference count and can execute certain
    [118](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:118)     # operations in-place.
--> [119](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:119)     return func(*(_execute_task(a, cache) for a in args))
    [120](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:120) elif not ishashable(arg):
    [121](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:121)     return arg

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:119, in _execute_task(arg, cache, dsk)
    [115](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:115)     func, args = arg[0], arg[1:]
    [116](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:116)     # Note: Don't assign the subtask results to a variable. numpy detects
    [117](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:117)     # temporaries by their reference count and can execute certain
    [118](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:118)     # operations in-place.
--> [119](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:119)     return func(*(_execute_task(a, cache) for a in args))
    [120](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:120) elif not ishashable(arg):
    [121](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:121)     return arg

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:119, in <genexpr>(.0)
    [115](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:115)     func, args = arg[0], arg[1:]
    [116](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:116)     # Note: Don't assign the subtask results to a variable. numpy detects
    [117](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:117)     # temporaries by their reference count and can execute certain
    [118](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:118)     # operations in-place.
--> [119](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:119)     return func(*(_execute_task(a, cache) for a in args))
    [120](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:120) elif not ishashable(arg):
    [121](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:121)     return arg

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:119, in _execute_task(arg, cache, dsk)
    [115](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:115)     func, args = arg[0], arg[1:]
    [116](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:116)     # Note: Don't assign the subtask results to a variable. numpy detects
    [117](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:117)     # temporaries by their reference count and can execute certain
    [118](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:118)     # operations in-place.
--> [119](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:119)     return func(*(_execute_task(a, cache) for a in args))
    [120](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:120) elif not ishashable(arg):
    [121](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:121)     return arg

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\optimization.py:969, in SubgraphCallable.__call__(self, *args)
    [967](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/optimization.py:967) if not len(args) == len(self.inkeys):
    [968](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/optimization.py:968)     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> [969](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/optimization.py:969) return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:149, in get(dsk, out, cache)
    [147](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:147) for key in toposort(dsk):
    [148](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:148)     task = dsk[key]
--> [149](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:149)     result = _execute_task(task, cache)
    [150](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:150)     cache[key] = result
    [151](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:151) result = _execute_task(out, cache)

File c:\Users\ofici\AppData\Local\Programs\Python\Python311\Lib\site-packages\dask\core.py:119, in _execute_task(arg, cache, dsk)
    [115](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:115)     func, args = arg[0], arg[1:]
    [116](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:116)     # Note: Don't assign the subtask results to a variable. numpy detects
    [117](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:117)     # temporaries by their reference count and can execute certain
    [118](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:118)     # operations in-place.
--> [119](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:119)     return func(*(_execute_task(a, cache) for a in args))
    [120](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:120) elif not ishashable(arg):
    [121](file:///C:/Users/ofici/AppData/Local/Programs/Python/Python311/Lib/site-packages/dask/core.py:121)     return arg

File ~\AppData\Roaming\Python\Python311\site-packages\climate_indices\indices.py:147, in spi(values, scale, distribution, data_start_year, calibration_year_initial, calibration_year_final, periodicity, fitting_params)
    [145](https://file+.vscode-resource.vscode-cdn.net/d%3A/World%20Bank/Paper%20-%20Child%20Mortality%20and%20Climate%20Shocks/~/AppData/Roaming/Python/Python311/site-packages/climate_indices/indices.py:145)     values = utils.reshape_to_2d(values, 366)
    [146](https://file+.vscode-resource.vscode-cdn.net/d%3A/World%20Bank/Paper%20-%20Child%20Mortality%20and%20Climate%20Shocks/~/AppData/Roaming/Python/Python311/site-packages/climate_indices/indices.py:146) else:
--> [147](https://file+.vscode-resource.vscode-cdn.net/d%3A/World%20Bank/Paper%20-%20Child%20Mortality%20and%20Climate%20Shocks/~/AppData/Roaming/Python/Python311/site-packages/climate_indices/indices.py:147)     raise ValueError(f"Invalid periodicity argument: {periodicity}")
    [149](https://file+.vscode-resource.vscode-cdn.net/d%3A/World%20Bank/Paper%20-%20Child%20Mortality%20and%20Climate%20Shocks/~/AppData/Roaming/Python/Python311/site-packages/climate_indices/indices.py:149) if distribution is Distribution.gamma:
    [150](https://file+.vscode-resource.vscode-cdn.net/d%3A/World%20Bank/Paper%20-%20Child%20Mortality%20and%20Climate%20Shocks/~/AppData/Roaming/Python/Python311/site-packages/climate_indices/indices.py:150)     # get (optional) fitting parameters if provided
    [151](https://file+.vscode-resource.vscode-cdn.net/d%3A/World%20Bank/Paper%20-%20Child%20Mortality%20and%20Climate%20Shocks/~/AppData/Roaming/Python/Python311/site-packages/climate_indices/indices.py:151)     if fitting_params is not None:

ValueError: Invalid periodicity argument: monthly

If I change the script a little bit by removing the dask arguments:

import xarray as xr
from climate_indices import indices, compute

precipitation = xr.open_dataset(
    rf"{DATA_PROC}\ERA5-Land_monthly_1970-2021_subset.nc", 
)

# Filter between 1970 and 1990 to reduce size
precipitation = precipitation.sel(time=slice("1970", "1972"))

da_precip_groupby = precipitation["tp"].stack(point=("lat", "lon")).groupby(group="point")

distribution = indices.Distribution.gamma
data_start_year = 1970
calibration_year_initial = 1970
calibration_year_final = 1972
periodicity = compute.Periodicity.monthly

da_spi = xr.apply_ufunc(
    indices.spi,
    da_precip_groupby,
    i,
    distribution,
    data_start_year,
    calibration_year_initial,
    calibration_year_final,
    periodicity,
)
da_spi = da_spi.unstack("point").rename(f"spi{i}")

Everything works as expected. I can plot the results without problems: da_spi.isel(time=7).plot() image

Is there something I'm missing here?

Expected behavior Compute the SPI1 for the selected area and months. I expect to get the same result with dask and without dask.

System Information:

Thanks in advance!

monocongo commented 1 week ago

Thanks for reporting this error, @Queeno11

Can you please reference the data used for this so I can reproduce and debug it myself?

Queeno11 commented 1 week ago

Thanks @monocongo for your promt response. Give it a shot with this small dataset: https://drive.google.com/file/d/1S_ehmZOIWfGgbjlyEYuy13B8ClfNZQnF/view?usp=sharing. It's just a compilation of ERA5-Land data for a small section of the globe.