bradyrx / esmtools

:octopus: a toolbox for Earth System Model analysis :octopus:
https://esmtools.readthedocs.io/en/latest/
MIT License
27 stars 4 forks source link

Unexpected behavior: eager evaluation when using `dask` arrays #65

Closed andersy005 closed 4 years ago

andersy005 commented 5 years ago

While working with @gelsworth, I learned that when given input containing dask arrays, fit_poly(ds, order, dim="time") and linear_regression(da, dim="time", interpolate_na=False, compact=True, psig=None) automatically cast the dask array into numpy arrays and eagerly evaluate the computation.

https://github.com/bradyrx/esmtools/blob/921425b22fdc8143f2ef63cd069776395a257cc0/esmtools/stats.py#L236-L238

My intuition/expectation is that these implementations would return a lazy object (dask array) when the input is a dask array, and as a user I can call .compute() or .load() on the returned object.

# Assuming that ds is a dataset with dask array
p = poly_fit(ds) # Return dask array
p = p.compute() # or p.load()

I was wondering if the decision to cast the input arrays into numpy arrays was intentional?

bradyrx commented 5 years ago

Thanks for opening this up @andersy005 . I really need to update these functions since they're pretty classic computations for climate data. I learned a lot about apply_ufunc functionality from developing code with @gelsworth and looking at what you guys put together.

I was wondering if the decision to cast the input arrays into numpy arrays was intentional?

No this wasn't intended. I'm not sure exactly why we did this originally. But this attempts to load things into memory as you mentioned and causes memory crashes if you're trying to do this with a large dataset.

I realized that apply_ufunc just passes in pure numpy arrays so you shouldn't need to do .values in the pure numpy portion of the code and definitely shouldn't load it into memory in the pre-processing.

The solution I came up with is similar to yours and Genevieve's. I'm going to paste it here for when I update esmtools for this:

from scipy.stats import linregress as lreg

def linear_regression(da, dim="time",):
    """
    Much more trimmed down version than what's in my package

    https://github.com/bradyrx/esmtools/blob/master/esmtools/stats.py
    """
    # Chunk over core dimension.
    da = da.chunk({dim: -1})
    x = da[dim]

    # ADD CODE HERE FOR INTERPOLATION (or private function)
    # This should be handled on the `xarray` level. Need to linearly interpolate
    # missing values and frontfill/backfill or else all NaNs would be returned.

    results = xr.apply_ufunc(
        lreg,
        x,
        da,
        input_core_dims=[[dim], [dim]],
        output_core_dims=[[], [], [], [], []],
        vectorize=True,
        dask="parallelized",
        # I think output_dtypes is needed here due to 
        # dask='parallelized' but lreg returns an object unique
        # to scipy.
    )
    return results

def linear_trend(da, dim='time'):
    # Want to keep time dimension chunked as one chunk.
    da_chunk = da.chunk({dim:-1})

    # ADD CODE HERE FOR INTERPOLATION (or private function)
    # This should be handled on the `xarray` level. Need to linearly interpolate
    # missing values and frontfill/backfill or else all NaNs would be returned.

    trend = xr.apply_ufunc(
    compute_slope, 
    da_chunk, 
    vectorize=True, 
    input_core_dims=[[dim]], 
    output_core_dims=[[]], 
    dask="parallelized", 
    output_dtypes = [np.float]
    )
    return trend

def compute_slope(y):
    x = np.arange(len(y))
    return np.polyfit(x, y, 1)[0] # return only slope
bradyrx commented 5 years ago

Another interesting design decision is that if you can leverage raw numpy funcs that are already vectorizable, it makes things a lot easier. E.g. polyfit can be applied to a grid without using dask or apply_ufunc (and thus without having to loop through). So in theory you don't need to use vectorize=True here which slows things down.

There are other functions that can't be used in this case. If you're using np.cov you really can only do this for two time series at once to get the expected behavior. I.e. you can't do np.cov at each grid cell in a vectorized fashion. So this is the case where you need to use vectorize=True on apply_ufunc which slows things down.

Also, @andersy005 , do you have any sense of if the new map_blocks functionality for xarray is more useful here? I've cracked the apply_ufunc code finally I think but am not sure if map_blocks is faster or preferred for any reason.

andersy005 commented 5 years ago

Thank you for your prompt reply.

Another interesting design decision is that if you can leverage raw numpy funcs that are already vectorizable, it makes things a lot easier

I concur

Also, @andersy005 , do you have any sense of if the new map_blocks functionality for xarray is more useful here? I've cracked the apply_ufunc code finally I think but am not sure if map_blocks is faster or preferred for any reason.

For dask, you could simply require that the dimension over which the computation is calculated be contiguous, and then call xr.map_blocks:

The linear_trend function could be re-written with map_blocks:

Note: I haven't tested the following yet :)

def linear_trend(da, dim='time'):
    # Make sure that the dimension over which the fit is calculated is contiguous
    da_dim_contiguous = da.chunk({dim:-1})
    trend = xr.map_blocks(compute_slope, da_dim_contiguous)
    return trend

def compute_slope(y):
    x = np.arange(len(y))
    return np.polyfit(x, y, 1)[0] # return only slope

I find the xr.apply_ufunc to be a bit complex, and with the new map_blocks()

bradyrx commented 4 years ago

Addressed in https://github.com/bradyrx/esmtools/pull/70.