spencerahill / aospy

Python package for automated analysis and management of gridded climate data
Apache License 2.0
84 stars 13 forks source link

Potential bottlenecks for large input datasets #171

Open spencerahill opened 7 years ago

spencerahill commented 7 years ago

A colleague wants to use aospy on 0.1 degree ocean data; see /archive/hmz/CM2.6/ocean/ on GFDL's filesystem. This is GFDL 'av' data organized as annual means, one year per file, for 200 years: ocean.0001.ann.nc, ..., ocean.0200.ann.nc. Each file is ~14GB, and in total it's ~3.1TB

While we generally use xarray.open_mfdataset and hence lazily-load, there are three places where data explicitly gets loaded into memory via load():

In this particular case, the grid attributes can come from the smaller /archive/hmz/CM2.6/ocean.static.nc file, but that itself isn't trivially small, at 371 MB.

@spencerkclark, do you recall the nature of the bugs when we didn't force loading? Any thoughts more generally about making all of the above logic more performant with large datasets? Ideally we never call load() on a full dataset; rather we take individual variables, reduce them as much as possible (in space and time), and then load.

spencerkclark commented 7 years ago

@spencerahill this is really good motivating case for looking deeper into how we integrate with dask! I've done a little bit of further investigation.

I have found that we are also implicitly loading data into memory when we call xr.decode_cf here: https://github.com/spencerahill/aospy/blob/develop/aospy/data_loader.py#L136. One way to avoid this is to separate out the time variables into a temporary standalone Dataset, and decode that (loading just the time-related variables into memory via xr.decode_cf), and then replace them in the Dataset containing data variables as dask arrays in data_loader._prep_time_data. This is kind of messy, but I'm not sure if there is a better way (it seems that this is a property of xr.decode_cf)?

Offline, I've experimented with doing that and removing all the explicit calls to load you listed above and was successful in having dask arrays propagated all the way through to the to_netcdf step (what we'd ultimately want).

Does that solve the problem here? Not completely: I think this is where this discussion connects back with https://github.com/spencerahill/aospy/issues/169#issuecomment-291121574. When doing this, and using either a dask.Bag or joblib setup with a dask.distributed cluster, array operations (e.g. mean reductions) are not done in a distributed context (instead all array operations in a single Calc object are computed in a single task). When working with larger-data, I think we would want to make distributed array operations possible. It's possible it would be a lot of work to enable that, but I'm still pretty new to doing this, so maybe it's not as intimidating as I think.

Lastly, if we do this we'd also want to either come up with some heuristics for automatically chunking Datasets we load in for best performance, or perhaps enable the user to set the default chunk arrangement in a DataLoader constructor.

Those are my (somewhat rambling) thoughts; I'm happy to talk about this more offline if it would help.

spencerahill commented 7 years ago

Thanks for digging into this!

I have found that we are also implicitly loading data into memory when we call xr.decode_cf here: https://github.com/spencerahill/aospy/blob/develop/aospy/data_loader.py#L136.

And also a few lines above via times.numpy_datetime_workaround_encode_cf, although in that case if I'm understanding correctly we do pull out the time array already, so it's not called on the full Dataset.

One way to avoid this is to separate out the time variables into a temporary standalone Dataset, and decode that (loading just the time-related variables into memory via xr.decode_cf), and then replace them in the Dataset containing data variables as dask arrays in data_loader._prep_time_data.

Sorry, how does that differ from our current use of numpy_datetime_workaround_encode_cf? (I'm probably misreading our code here.)

Offline, I've experimented with doing that and removing all the explicit calls to load you listed above and was successful in having dask arrays propagated all the way through to the to_netcdf step (what we'd ultimately want).

Nice! Do you recall the nature of the problems before though that led us to adding the load() calls? Wondering if those could potentially resurface.

I think this is where this discussion connects back with #169 (comment). When doing this, and using either a dask.Bag or joblib setup with a dask.distributed cluster, array operations (e.g. mean reductions) are not done in a distributed context (instead all array operations in a single Calc object are computed in a single task). When working with larger-data, I think we would want to make distributed array operations possible.

Exactly. Our joblib-based step, though very exciting, doesn't change this one-task-per-Calc paradigm. And that does seem like a major undertaking. I've got some vague ideas that I'll try to flesh out. But this doesn't preclude proceeding with #169 nor cleaning up these load() calls to the extent possible.

spencerkclark commented 7 years ago

Sorry, how does that differ from our current use of numpy_datetime_workaround_encode_cf? (I'm probably misreading our code here.)

That particular step uses decode_cf only to determine the minimum year included in the Dataset (as it was read in). If that year is less than the minimum pd.Timestamp year, we modify the units attribute of all time-related variables in the actual Dataset (but importantly do not decode those time variables yet). In data_loader._prep_time_data we actually decode the time variables (based on those potentially-modified units) that we append as coordinates to every DataArray we pass around in aospy:

This is the crude re-write the sidesteps loading the Dataset into memory in _prep_time_data; it would probably be best to write a helper function called something along the lines of _decode_times_no_load to do everything between the end of the else block and the return statement. internal_names.ALL_TIME_VARS is a list of the variable names listed above.

def _prep_time_data(ds):
    """Prepare time coord. information in Dataset for use in aospy.

    1. Edit units attribute of time variable if it contains
    a Timestamp invalid date
    2. If the Dataset contains a time bounds coordinate, add
    attributes representing the true beginning and end dates of
    the time interval used to construct the Dataset
    3. Decode the times into np.datetime64 objects for time
    indexing

    Parameters
    ----------
    ds : Dataset
        Pre-processed Dataset with time coordinate renamed to
        internal_names.TIME_STR

    Returns
    -------
    Dataset, int
        The processed Dataset and minimum year in the loaded data
    """
    ds = times.ensure_time_as_dim(ds)
    ds, min_year = times.numpy_datetime_workaround_encode_cf(ds)
    if internal_names.TIME_BOUNDS_STR in ds:
        ds = times.ensure_time_avg_has_cf_metadata(ds)
    else:
        logging.warning("dt array not found.  Assuming equally spaced "
                        "values in time, even though this may not be "
                        "the case")
        ds = times.add_uniform_time_weights(ds)
    ds_temp = xr.Dataset()
    for v in internal_names.ALL_TIME_VARS:
        if v in ds:
            ds_temp[v] = ds[v]
    ds_temp = xr.decode_cf(ds_temp, decode_times=True,
                           decode_coords=False, mask_and_scale=False)
    for v in internal_names.ALL_TIME_VARS:
        if v in ds:
            ds[v] = ds_temp[v]
    return ds, min_year

Do you recall the nature of the problems before though that led us to adding the load() calls? Wondering if those could potentially resurface.

The motivation for eagerly loading data into memory was originally rooted in the concern that data might not be chunked well by default for performance. As long as we come up with a reasonable solution to that issue, I see no problem in using dask by default (rather than numpy) in most calculations. That being said, if one has more complicated operations hidden in a function defined in one's object library (for example, things in infinite-diff) that are not dask-array compatible, one will run into issues. So perhaps we should support both pathways with a flag in compute_kwargs?

spencerahill commented 7 years ago

it would probably be best to write a helper function called something along the lines of _decode_times_no_load to do everything between the end of the else block and the return statement.

agreed

That being said, if one has more complicated operations hidden in a function defined in one's object library (for example, things in infinite-diff) that are not dask-array compatible, one will run into issues. So perhaps we should support both pathways with a flag in compute_kwargs?

No need to shame me for infinite-diff's shortcomings 😉 . Yes I think it would be prudent to have a compute_kwargs option. Would modifying Calc._local_ts as follows suffice:

    def _local_ts(self, *data):
        """Perform the computation at each gridpoint and time index."""
        if self._do_force_load:  # or however we ultimately choose to encode that option
            [d.load() for d in data]
        arr = self.function(*data)
        ...
spencerahill commented 7 years ago

Also thanks for clarifying the distinction w/ numpy_datetime_workaround_encode_cf...that makes sense now.

spencerkclark commented 7 years ago

No need to shame me for infinite-diff's shortcomings 😉 .

Sorry, I didn't mean to single it out! I'm sure there are plenty of other examples; this was just the first that came to mind that I had a link to.

Would modifying Calc._local_ts as follows suffice?

Nice! Yes, it does seem like something as simple as that could work. When I get a chance I'll give it a try.

Yes I think it would be prudent to have a compute_kwargs option.

Upon a little more reflection on this, what do you think about making this an attribute of a Var object that one could set in its constructor instead? For example:

dask_incompatible_var = Var(
    name='dask_incompatible',
    ...
    func=calcs.dask_incompatible_func,
    dask_compatible=False
)

I think we could make the default value True, and let users flag the incompatible cases.

spencerahill commented 7 years ago

Sorry, I didn't mean to single it out!

Haha sorry, I was totally kidding!

making this an attribute of a Var object that one could set in its constructor

Great call. The point of this is to be Var-specific anyways. Maybe it's good to keep the compute_kwargs route too though to turn it off for a whole CalcSuite, e.g. for debugging purposes. What do you think?

spencerahill commented 7 years ago

I have found that we are also implicitly loading data into memory when we call xr.decode_cf here

See https://github.com/pydata/xarray/issues/1372, which implies that this eager loading is a bug that will eventually be fixed in xarray