hippalectryon-0 / xr-scipy

scipy for xarray eco-system
http://xr-scipy.readthedocs.io
61 stars 9 forks source link

Support passing arguments to `apply_ufunc` #15

Open kripnerl opened 2 years ago

kripnerl commented 2 years ago

When using xrscipy is using default calling of apply_ufunc with dask="forbidden". This disallows using the package with dask arrays what is needed for example when using preprocess option in open_mfdataset.

Not sure whether this is a design decision; however, would it be possible to pass some kwargs for xrscipy internals?

On the other hand, adding .load() in the preprocess function resolves this issue.

Any opinion @smartass101, @fujiisoup ?

Thank you, Lukas

Example

import xrscipy
import numpy as np

# Create example datasets: 
Nt = 256; Nx = 10
xs = np.arange(Nx)
ds_list = []

for x in xs: 
    ds = xr.Dataset({"time_data": ("time", np.random.rand((Nt)))},
                   coords={
                       "time": np.arange(Nt), 
                       "x": x
                   })
    ds_list.append(ds)

# Seve as mf_dataset:

xr.save_mfdataset(datasets=ds_list,
                  paths=[f"/tmp/ds_{x}.nc" for x in xs])

# Processing function (this is important): 

func = lambda ds: xrscipy.fft.fft(ds.time_data, "time")

# This resolves the issue
# func = lambda ds: xrscipy.fft.fft(ds.time_data.load(), "time")

ds_mf = xr.open_mfdataset([f"/tmp/ds_{x}.nc" for x in xs],
                           concat_dim="x",
                           parallel=True,
                           combine="nested",
                           preprocess=func)

Error

<ipython-input-35-1f7125289f88> in <lambda>(ds)
     24 # Processing function:
     25 
---> 26 func = lambda ds: xrscipy.fft.fft(ds.time_data, "time")
     27 
     28 ds_mf = xr.open_mfdataset([f"/tmp/ds_{x}.nc" for x in xs],

/sw/python/3.8-anaconda-2020.07/lib/python3.8/site-packages/xrscipy/fft.py in _wrap1d(func, freq_func, y, coord, **kwargs)
     44         return result.set_dims(v.dims)
     45 
---> 46     ds = utils.wrap_dataset(apply_func, y, dim, keep_coords='drop')
     47 
     48     # attach frequency coordinate

/sw/python/3.8-anaconda-2020.07/lib/python3.8/site-packages/xrscipy/utils.py in wrap_dataset(func, y, *dims, **kwargs)
     26 
     27     if isinstance(y, xr.DataArray):
---> 28         result = wrap_dataset(func, y._to_temp_dataset(), *dims,
     29                               keep_coords=keep_coords)
     30         # Drop unnecessary coordinate.

/sw/python/3.8-anaconda-2020.07/lib/python3.8/site-packages/xrscipy/utils.py in wrap_dataset(func, y, *dims, **kwargs)
     37         for key in y.data_vars:
     38             if any(d in y[key].dims for d in dims):
---> 39                 ds[key] = func(y[key].variable)
     40             else:
     41                 ds[key] = y[key]

/sw/python/3.8-anaconda-2020.07/lib/python3.8/site-packages/xrscipy/fft.py in apply_func(v)
     39     def apply_func(v):
     40         # v: xr.Varaible
---> 41         result = xr.apply_ufunc(
     42             func, v, input_core_dims=[[dim]],
     43             output_core_dims=[output_core_dim], kwargs=kwargs)

/sw/python/3.8-anaconda-2020.07/lib/python3.8/site-packages/xarray/core/computation.py in apply_ufunc(func, input_core_dims, output_core_dims, exclude_dims, vectorize, join, dataset_join, dataset_fill_value, keep_attrs, kwargs, dask, output_dtypes, output_sizes, meta, dask_gufunc_kwargs, *args)
   1179     # feed Variables directly through apply_variable_ufunc
   1180     elif any(isinstance(a, Variable) for a in args):
-> 1181         return variables_vfunc(*args)
   1182     else:
   1183         # feed anything else through apply_array_ufunc

/sw/python/3.8-anaconda-2020.07/lib/python3.8/site-packages/xarray/core/computation.py in apply_variable_ufunc(func, signature, exclude_dims, dask, output_dtypes, vectorize, keep_attrs, dask_gufunc_kwargs, *args)
    650     if any(is_duck_dask_array(array) for array in input_data):
    651         if dask == "forbidden":
--> 652             raise ValueError(
    653                 "apply_ufunc encountered a dask array on an "
    654                 "argument, but handling for dask arrays has not "

ValueError: apply_ufunc encountered a dask array on an argument, but handling for dask arrays has not been enabled. Either set the ``dask`` argument or load your data into memory first with ``.load()`` or ``.compute()``
hippalectryon-0 commented 1 year ago

.load() is not a satisfactory answer since it loads the whole array in memory, which defeats parts of the benefits of using dask.

It should be pretty easy to add additional kwargs for apply_ufunc.

kripnerl commented 1 year ago

Also, it would be possible to use is_duck_dask_array and call apply_ufunc considerably.