pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.62k stars 1.09k forks source link

'Parallelized' apply_ufunc for scripy.interpolate.griddata #5281

Open LJaksic opened 3 years ago

LJaksic commented 3 years ago

Hi,

I'm working with large files from an ocean model with an unstructered grid. For instance, variable flow velocity ux with dimensions (194988, 1009, 20) for respectively: 'nFlowElement' (name unstructered grid element), 'time' and laydim (depth dimension). I'd like to interpolate these results to a structured grid with dimensions (600, 560, 1009, 20)for respectively: latitude, longitude, time and laydim. For this I am using scipy.interpolate.griddata. As these dataarrays are too large to load into your working memory at once, I am trying to work with 'chunks' (dask). Unfortunately, I bump into problems when trying to use apply_ufunc with setting: dask = 'parallelized'.

For smaller computational domains (smaller nFlowElement dimension) I ám still able to load the dataarray in my work memory. Then, the following code gives me the wanted result:

def interp_to_grid(u,xc,yc,xint,yint):
    print(u.shape,xc.shape,xint.shape)
    ug = griddata((xc,yc),u,(xint,yint), method='nearest', fill_value=np.nan)
    return ug 

uxg = xr.apply_ufunc(interp_to_grid,
                     ux, xc, yc, xint, yint,
                     dask = 'allowed',
                     input_core_dims=[['nFlowElem','time','laydim'],['nFlowElem'],['nFlowElem'],['dim_0','dim_1'],['dim_0','dim_1']],
                     output_core_dims=[['dim_0','dim_1','time','laydim']],
                     output_dtypes = [xr.DataArray]
                     )

Notice that in the function interp_to_grid the input variables have the following dimensions:

However, for much larger spatial domains it is required to work with dask = 'parallelized', because these input dataarrays can nolonger be loaded into my working memory. I have tried to apply chunks over the time dimension, but also over the nFlowElement dimension. I am aware that it is not possible to chunk over core dimensions.

This is one of my "parallel" attempts (with chunks along the time dim):

Input ux:

<xarray.DataArray 'ucx' (nFlowElem: 194988, time: 1009, laydim: 20)>
dask.array<transpose, shape=(194988, 1009, 20), dtype=float64, chunksize=(194988, 10, 20), chunktype=numpy.ndarray>
Coordinates:
    FlowElem_xcc  (nFlowElem) float64 dask.array<chunksize=(194988,), meta=np.ndarray>
    FlowElem_ycc  (nFlowElem) float64 dask.array<chunksize=(194988,), meta=np.ndarray>
  * time          (time) datetime64[ns] 2014-09-17 ... 2014-10-01
Dimensions without coordinates: nFlowElem, laydim
Attributes:
    standard_name:  eastward_sea_water_velocity
    long_name:      velocity on flow element center, x-component
    units:          m s-1
    grid_mapping:   wgs84

Apply_func:

uxg = xr.apply_ufunc(interp_to_grid,
                     ux, xc, yc, xint, yint,
                     dask = 'parallelized',
                     input_core_dims=[['nFlowElem'],['nFlowElem'],['nFlowElem'],['dim_0','dim_1'],['dim_0','dim_1']],
                     output_core_dims=[['dim_0','dim_1']],
                     output_dtypes = [xr.DataArray],
                     )

Gives error:

  File "interpnd.pyx", line 78, in scipy.interpolate.interpnd.NDInterpolatorBase.__init__

  File "interpnd.pyx", line 192, in scipy.interpolate.interpnd._check_init_shape

ValueError: different number of values and points

I have played around a lot with changing the core dimensions in apply_ufunc and the dimension along which to chunk. Also I have tried to manually change the order of dimensions of dataarray u which is 'fed to' griddata (in interp_to_grid).

Any advice is very welcome! Best Wishes, Luka

mathause commented 3 years ago

map_blocks may be the function you are looking for: http://xarray.pydata.org/en/stable/user-guide/dask.html?highlight=map_blocks#map-blocks

LJaksic commented 3 years ago

@mathause Thank you for your response! I believe that map_blocks only works for functions that consume and return xarray objects, right? Scipy.interpolate.griddata unfortunately does not return Xarray object.

However, if there are other (xarray) functions which I could use to interpolate my results to a regular grid - I'd be very interested.

LJaksic commented 3 years ago

### UPDATE 1: ### I have also tried to chunk over the nFlowElem dimension instead of 'time'. Then I define the input and output core dimensions as written below:

input_core_dims=[['time','laydim'],[],[],['dim_0','dim_1'],['dim_0','dim_1']],
output_core_dims=[['dim_0','dim_1','time','laydim']],

Now I "only" get an error concerning the size of my output (which is nolonger chunked). My output is nolonger chunked, because nFlowElem is lost after the interpolation. Consequently, the (600,560,1009,20) dataarray is too large for my work memory.

Is there perhaps a way to have apply_ufunc chunk your output along any of the other dimensions?

### UPDATE 2: ###

Alternatively, I have tried to chunk over the time dimension (50 time steps) and I have removed all input/output core dimensions.

And if I then define interp_to_grid as follows (to get the right input dimensions):

def interp_to_grid(u,xc,yc,xint,yint):

    xc = xc[:,0,0,0,0]
    yc = yc[:,0,0,0,0]
    u = u[:,:,:,0,0]
    print(u.shape,xc.shape,xint.shape) #input shape

    ug = griddata((xc,yc),u,(xint,yint), method='nearest', fill_value=np.nan)
    print(ug.shape) #output shape

    return ug 

I do get the right dimensions for my interp_to_grid-input (see first line) and output (see second line). However, I get the ValueError: axes don't match array:

(194988, 50, 20) (194988,) (600, 560)
(600, 560, 50, 20)
Traceback (most recent call last):

  File "<ipython-input-11-6b65fe3dba5b>", line 1, in <module>
    uxg.loc[dict(time=np.datetime64('2014-09-18'),laydim=19)].plot()

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\plot\plot.py", line 444, in __call__
    return plot(self._da, **kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\plot\plot.py", line 160, in plot
    darray = darray.squeeze().compute()

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\core\dataarray.py", line 899, in compute
    return new.load(**kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\core\dataarray.py", line 873, in load
    ds = self._to_temp_dataset().load(**kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\xarray\core\dataset.py", line 798, in load
    evaluated_data = da.compute(*lazy_data.values(), **kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\threaded.py", line 84, in get
    **kwargs

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\local.py", line 487, in get_async
    raise_exception(exc, tb)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\local.py", line 317, in reraise
    raise exc

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\local.py", line 222, in execute_task
    result = _execute_task(task, data)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\optimization.py", line 963, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 151, in get
    result = _execute_task(task, cache)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\dask\utils.py", line 35, in apply
    return func(*args, **kwargs)

  File "<__array_function__ internals>", line 6, in transpose

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\numpy\core\fromnumeric.py", line 658, in transpose
    return _wrapfunc(a, 'transpose', axes)

  File "C:\Users\920507\Anaconda3\envs\dfm_tools_env\lib\site-packages\numpy\core\fromnumeric.py", line 58, in _wrapfunc
    return bound(*args, **kwds)

ValueError: axes don't match array
TomNicholas commented 3 years ago

@LJaksic are you aware that passing dask_gufunc_kwargs={'allow_rechunk': True} to xr.apply_ufunc will allow it to accept an input which is chunked along the core dims? (See https://github.com/pydata/xarray/issues/1995#issuecomment-842556731) It does come with a warning though.