pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.53k stars 1.05k forks source link

Rolling window operation does not work with dask arrays #1279

Closed vnoel closed 6 years ago

vnoel commented 7 years ago

As the title says :-)

This would be very useful to downsample long time series read from multiple consecutive netcdf files.

Note that I was able to apply the rolling window by converting my variable to a pandas series with to_series(). I then could use panda's own rolling window methods. I guess that when converting to a pandas series the dask array is read in memory?

rabernat commented 7 years ago

It seems like the most efficient way to handle this would be to use ghost cells equal to the window length.

shoyer commented 7 years ago

Note that I was able to apply the rolling window by converting my variable to a pandas series with to_series(). I then could use panda's own rolling window methods. I guess that when converting to a pandas series the dask array is read in memory?

Yes, this is correct -- we automatically compute dask arrays when converting to pandas, because pandas does not have any notion of lazy arrays.

Note that we currently have two versions of rolling window operations:

  1. Implemented with bottleneck. These are fast, but only work in memory. Something like ghost cells would be necessary to extend them to dask.
  2. Implemented with a nested loop written in Python. These are much slower, both because of the algorithm (time O(dim_size * window_size) instead of time O(dim_size)) and implementation of the inner loop in Python instead of C, but there's no fundamental reason why they shouldn't be able to work for dask arrays basically as is.
jhamman commented 7 years ago

An idea...since we only have 1-D rolling methods in xarray, couldn't we just use map_blocks with numpy/bottleneck functions when the rolling dimension is completely contained in a dask chunk?

shoyer commented 7 years ago

An idea...since we only have 1-D rolling methods in xarray, couldn't we just use map_blocks with numpy/bottleneck functions when the rolling dimension is completely contained in a dask chunk?

Yes, that would work for such cases.

darothen commented 7 years ago

Dask dataframes have recently been updated so that rolling operations work (dask/dask#2198). Does this open a pathway to enable rolling on dask arrays within xarray?

shoyer commented 7 years ago

@darothen we would need to add xarray -> dask dataframe conversion functions, which don't currently exist. Otherwise I think we would still need to rewrite this (but of course the dataframe implementation could be a useful reference point).

darothen commented 6 years ago

In light of #1489 is there a way to move forward here with rolling on dask-backed data structures?

In soliciting the atmospheric chemistry community for a few illustrative examples for gcpy, it's become apparent that indices computed from re-sampled timeseries would be killer, attention-grabbing functionality. For instance, the EPA air quality standard we use for ozone involves taking hourly data, computing 8-hour rolling means for each day of your dataset, and then picking the maximum of those means for each day ("MDA8 ozone"). Similar metrics exist for other pollutants.

With traditional xarray data-structures, it's trivial to compute this quantity (assuming we have hourly data and using the new resample API from #1272):

ds = xr.open_dataset("hourly_ozone_data.nc")
mda8_o3 = (
    ds['O3']
    .rolling(time=8, min_periods=6)
    .mean('time')
    .resample(time='D').max()
)

There's one quirk relating to timestamp the rolling data (by default rolling uses the last timestamp in a dataset, where in my application I want to label data with the first one) which makes that chained method a bit impractical, but it only adds like one line of code and it is totally dask-friendly.

shoyer commented 6 years ago

@darothen Can you give an example of typical shape and chunks for your data when you load it with dask?

My sense is that we would do better to keep everything in the form of (dask) arrays, rather than converting into dataframes. For the highest performance, I would make a dask array routine that combines ghosting, map blocks and bottleneck's rolling window functions. Then it should be straightforward into rolling in place of the existing bottleneck routine.

jhamman commented 6 years ago

@darothen and @shoyer -

Here's a little wrapper function that does the dask and bottleneck piece...

def dask_rolling_wrapper(moving_func, a, window, min_count=None, axis=-1):
    '''wrapper to apply bottleneck moving window funcs on dask arrays'''
    # inputs for ghost
    if axis < 0:
        axis = a.ndim + axis
    depth = {d: 0 for d in range(a.ndim)}
    depth[axis] = window - 1
    boundary = {d: np.nan for d in range(a.ndim)}
    # create ghosted arrays
    ag = da.ghost.ghost(a, depth=depth, boundary=boundary)
    # apply rolling func
    out = ag.map_blocks(moving_func, window, min_count=min_count,
                        axis=axis, dtype=a.dtype)
    # trim array
    result = da.ghost.trim_internal(out, depth)
    return result

I don't think this would be all that difficult to drop into our current Rolling class.

darothen commented 6 years ago

@shoyer - This output is usually provided as a sequence of daily netCDF files, each on a ~2 degree global grid with 24 timesteps per file (so shape 24 x 96 x 144). For convenience, I usually concatenate these files into yearly datasets, so they'll have a shape (8736 x 96 x 144). I haven't played too much with how to chunk the data, but it's not uncommon for me to load 20-50 of these files simultaneously (each holding a years worth of data) and treat each year as an "ensemble member dimension, so my data has shape (50 x 8736 x 96 x 144). Yes, keeping everything in dask array land is preferable, I suppose.

@jhamman - Wow, that worked pretty much perfectly! There's a handful of typos (you switch from "a" to "x" halfway through), and there's a lot of room for optimization by chunksize. But it just works, which is absolutely ridiculous. I just pushed a ~200 GB dataset on my cluster with ~50 cores and it screamed through the calculation.

Is there anyway this could be pushed before 0.10.0? It's a killer enhancement.

jhamman commented 6 years ago

@darothen - I'll open a PR in a few minutes. I'll fix the typos.

jhamman commented 6 years ago

see #1568 for PR that adds this