Open rabernat opened 5 years ago
What I would like to see is a standalone package called something like xarray-mergetool.
I think we should postpone effort on developing a separate package like xarray-mergetool. There are likely to be relatively few implications for the user-facing API, so we should focus first on functionality. I would be happy with a separate package, but note constructing input to control merging is not trivial—and that in part is what the value-added piece is here.
@andersy005 and I propose the following aggregation_control
as an extension of the catalog spec definition:
"aggregation_control": {
"type": "object",
"required": [
"variable_column_name"
],
"properties": {
"variable_column_name": {
"title": "Variable Column Name",
"description": "Name of the attribute column in csv file that contains the variable name.",
"type": "string"
},
"join_newdim_column_names": {
"title": "joinNewDim column names",
"description": "Column names across which join datasets along a new dimension.",
"type": "array",
"dset_groupby_column_names": {
"title": "Compatible dataset column names",
"description": "Column names for which common values indicate datasets that can be joined.",
"type": "array"
}
}
}
}
For CMIP6, this would look like this:
"aggregation_control": {
"dset_groupby_column_names": [
"activity_id",
"institution_id",
"source_id",
"experiment_id",
"table_id",
"grid_label"],
"join_newdim_column_names": ["member_id"],
"variable_column_name": "variable_id"
}
variable_column_name
is currently required to prevent xarray
from doing the wrong thing with static variables.
The code that accomplishes the aggregation works as follows.
dset_groupby_column_names
to create compatible_groups
.compatible_groups
and groupby join_newdim_column_names
to create join_newdim_groups
.join_newdim_groups
, open the assets in this group, apply expand_dims
as necessary, append to dset_list
.combine_by_coords
to dset_list
and return.I think we have a working prototype in hand.
It is a requirement that successive "groupby" operations applied to dset_groupby_column_names
and then to join_newdim_column_names
yield a collection of datasets that can be merged.
aggegration_control
is a property of the collection and immutable. If a user is interested in avoiding merge behavior, returning, for example, a single ensemble member, they must do this by adding specificity to their query.
We are automatically concatenating multiple files in time, should multiple files exist after successive grouping, but this is happening under the hood in xarray
using the appropriate coordinate information. Again, the search
API provides a mechanism to avoid this, but it might be desirable to have attributes in some collections like epoch
or date_range
that enable appropriate searching.
Another use case we considered is the Decadal Prediction experiments. These might look similar to the CMIP6 example above, except for the following.
"join_newdim_column_names": ["member_id", "start_year"],
@rabernat, what do you think?
Where does the time concatenation come in?
It's handled implicitly by xarray
.
And here's the implementation that I've come up with:
def _open_dataset(self):
path_column_name = self._col_data['assets']['column_name']
if 'format' in self._col_data['assets']:
data_format = self._col_data['assets']['format']
use_format_column = False
else:
format_column_name = self._col_data['assets']['format_column_name']
use_format_column = True
compatible_attrs = self._col_data["aggregation_control"].get("dset_groupby_column_names", [])
concat_newdim_attrs = self._col_data["aggregation_control"].get("join_newdim_column_names", [])
variable_column_name = self._col_data["aggregation_control"]["variable_column_name"]
if compatible_attrs:
groups = self.df.groupby(compatible_attrs)
else:
groups = self.df.groupby(self.df.columns.tolist())
dsets = {}
for compat_key, compatible_group in groups:
if concat_newdim_attrs:
concat_groups = compatible_group.groupby(concat_newdim_attrs)
else:
concat_groups = compatible_group.groupby(compatible_group.columns.tolist())
datasets = []
for concat_key, concat_group in concat_groups:
temp_ds = []
for _, row in concat_group.iterrows():
if use_format_column:
data_format = row[format_column_name]
if concat_newdim_attrs:
if isinstance(concat_key, str):
concat_key = [concat_key]
expand_dims = dict(zip(concat_newdim_attrs, concat_key))
expand_dims = {dim_name: [dim_value] for dim_name, dim_value in expand_dims.items()}
varname = [row[variable_column_name]]
temp_ds.append(_open_dataset(row, path_column_name, varname, data_format, expand_dims=expand_dims, zarr_kwargs=self.zarr_kwargs, cdf_kwargs=self.cdf_kwargs))
datasets.extend(temp_ds)
attrs = dict_union(*[ds.attrs for ds in datasets])
dset = xr.combine_by_coords(datasets)
dset = _restore_non_dim_coords(dset)
dset.attrs = attrs
group_id = '.'.join(compat_key)
dsets[group_id] = dset
self._ds = dsets
It's handled implicitly by
xarray
.
Explicit is better than implicit.
We should try to leverage the vocabulary from ncml rather than inventing something new.
I still stand by this. They have thought this problem through.
Why can't we have something like?
"aggregations": [
{
"type": "union",
"attribute_name": "variable_id"
},
{
"type": "join_new",
"attribute_name": "member_id"
}.
{
"type": "join_existing",
"attribute_name": "time_range",
"dim_name": "time"
}
]
I think that approach could work and I like it's clarity and simplicity.
I could see @andersy005's implementation essentially working as is, but first deriving dset_groupby_column_names
from the columns not specified in aggregations
and proceeding along a similar path...with the exception of the join_existing
, for which we're relying on xarray
's coordinate-aware combine. This is explicit in a sense.
I'd have to think more about an alternative implementation.
I wasn't trying to provide a complete example, just referring to the aggregation part. I agree that we should still explicitly have something like dset_groupby_column_names
. In my version, the complete example would look like.
"suggested_aggregations": {
"groupby_attributes": [
"activity_id",
"institution_id",
"source_id",
"experiment_id",
"table_id",
"grid_label"],
"aggregations": [
{
"type": "union",
"attribute_name": "variable_id"
},
{
"type": "join_new",
"attribute_name": "member_id",
"options": {"coords": "minimal"}
}.
{
"type": "join_existing",
"attribute_name": "time_range",
"dim_name": "time"
}
]
}
An implementation would do something like this for each group
Doing the aggregations in an general way requires a recursive algorithm.
I hacked up a little demo of how this sort of recursive algorithm might work:
https://nbviewer.jupyter.org/gist/rabernat/eb15709b2b7cac33b64f1164dfd49992
Hope this is useful. I would eventually like to implement this in the hypothetical xarray-mergetool package. For now, feel free to use it in intake-esm if it fits.
@rabernat
In apply_aggregation
you pointed out that we don't actually care about the keys, and I was wondering how we are going to know the values for the join_new
dimension?
def apply_aggregation(v, level=0):
"""Recursively descend into nested dictionary and aggregate items.
level tells how deep we are."""
assert level <= n_agg
if level == n_agg:
# bottom of the hierarchy - should be an actual path at this point
return open_dataset(v)
else:
agg_column = agg_columns[level]
agg_function = agg_function_map[aggregation_dict[agg_column]['type']]
# we don't actually care about the keys
dsets = [apply_aggregation(w, level=level+1)
for w in v.values()]
return agg_function(*dsets)
For instance, with the following group:
{'pr': {'r1i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/',
'r2i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/',
'r3i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}
I end up with three datasets from the three ensemble members:
'union(join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/), OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/), OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/)))'
When join_new()
is called, it gets a list of three datasets, but it is not aware of the r1i1p1f1
, r2i1p1f1
, r3i1p1f1
values for the new member_id
. I am not sure how the join_new()
operation is going to work without knowing the actual values for member_id
.
In my previous implementation, I was expanding every single asset (zarr store, netcdf file) with a new dim after the open_dataset operation
Good observation.
So apply_aggregation
does need to care about the keys.
apply_aggregation(keys, values, level=0)
keys
keyword (but maybe do nothing with it) other than assert len(dsets) == len(keys)
keys
)Alternatively, we could make all function just accept a dictionary of keys: dsets
. More concise but also maybe more confusing to read.
Awesome! Thank you for for the prompt reply! I am going to try this out
to understand recursion you must first understand recursion. :)
Well, I am having a hard time getting it to work :)
@rabernat, I was having a hard time wrapping my head around your approach, and I made a few changes:
def open_dataset(d, agg_column, key):
return f'OPEN({d}.EXPAND_DIMS({{{agg_column} : {[key]}}})'
def join_new(*d):
return 'join_new(' + ', '.join(d) + ')'
def join_existing(*d):
return 'join_existing(' + ', '.join(d) + ')'
def union(*d):
return 'union(' + ', '.join(d) + ')'
def apply_aggregation(v, agg_column=None, k=None, level=0):
"""Recursively descend into nested dictionary and aggregate items.
level tells how deep we are."""
assert level <= n_agg
if level == n_agg:
# bottom of the hierarchy - should be an actual path at this point
return open_dataset(v, agg_column, k)
else:
agg_column = agg_columns[level]
agg_type = aggregation_dict[agg_column]['type']
agg_function = agg_function_map[agg_type]
dsets = [apply_aggregation(value, agg_column=agg_column, k=key, level=level+1)
for key, value in v.items()]
return agg_function(*dsets)
Everything seemed to be working when there's only one new dimension to add in join_new()
:
nd = {'pr': {'r1i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/',
'r2i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/',
'r3i1p1f1': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}
apply_aggregation(nd)
"union(join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/.EXPAND_DIMS({member_id : ['r1i1p1f1']}), OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/.EXPAND_DIMS({member_id : ['r2i1p1f1']}), OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/.EXPAND_DIMS({member_id : ['r3i1p1f1']})))"
I tried simulating one other use case for Decadal Prediction datasets. This case requires adding two new dimensions (start_year
, member_id
)
with
nd={'pr': {'r1i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/'},
'r2i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/'},
'r3i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}}
aggregation_dict = {'variable_id': {'type': 'union'},
'member_id': {'type': 'join_new', 'options': {'coords': 'minimal'}},
'start_year': {'type': 'join_new', 'options': {'coords': 'minimal'}}}
apply_aggregation produces the wrong results
apply_aggregation(nd)
"union(join_new(join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/.EXPAND_DIMS({start_year : ['1960']})), join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/.EXPAND_DIMS({start_year : ['1960']})), join_new(OPEN(gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/.EXPAND_DIMS({start_year : ['1960']}))))"
Any ideas on how to do this the right way are appreciated
Cc @matt-long
I see a few issues in what you posted.
_How can you apply an aggregation over start_year
when you only have a single item for each year?_ Aggregation only makes sense when you have multiple datasets to combine, for example:
'r2i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/1960',
'1980': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/1980'}
Either remove the start_year
aggregation or else provide more values to operate on. (This error shows that some input validation / consistency checks would be very useful here.)
~_Why are you calling join_new
on a timeseries?_ You should be doingjoin_existing
, possibly specifying dim='time'
as part of the join option.~
I would not call expand_dims
in open_dataset
. Expand dims is only necessary in join_new
. So it should be called there. Moreover, I'm not sure expand_dims
is necessary at all. Note that xarray.concat is meant to "Concatenate xarray objects along a new or existing dimension." You can use a dataarray as the concat dim, like this
concat_dim = xr.DataArray([member_ids], dim=['member_id'])
return xr.concat(dsets, dim=concat_dim)
One implication of what I said above is that the catalog needs to be fully explicit about each data granule. I don't know if you are currently relying on globbing / open_mfdataset to discover time granules--that should not be done anymore.
- _Why are you calling
join_new
on a timeseries?_ You should be doingjoin_existing
, possibly specifyingdim='time'
as part of the join option
Ok I see how I misunderstood this. In decadal prediction, the start_date
IS clearly a new dimension. However, I think my points 1 and 3 still stand.
Aggregation only makes sense when you have multiple datasets to combine
The right representation for my example
nd={'pr': {'r1i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/'},
'r2i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/'},
'r3i1p1f1': {'1960': 'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}}
should have been
nd={'pr': {'1960': {'r1i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/'},
'r2i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/'},
'r3i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/'}}}}
However, I think my points 1 and 3 still stand.
I concur... Let me take another stab at it again
I don't know if you are currently relying on globbing / open_mfdataset to discover time granules
I wasn't relying on globbing or open_mfdataset. I was opening each file independently with open_dataset()
and using xr.combine_by_coords()
to do the magic time
concatenation.
You have changed the nesting order, but I still don't see the point. Why are you trying to do join_new
over start_date
when you only have one key for start_date
? Your example would make more sense to me if you had multiple start dates to concatenate.
I wasn't relying on globbing or open_mfdataset. I was opening each file independently with
open_dataset()
and usingxr.combine_by_coords()
to do the magictime
concatenation.
But you are still assuming that you will "list" the directory contents to discover the files to open, correct? What I am suggesting is that, going forward, every single individual file must appear explicitly in the CSV catalog.
You have changed the nesting order, but I still don't see the point. Why are you trying to do join_new over start_date when you only have one key for start_date? Your example would make more sense to me if you had multiple start dates to concatenate.
Assuming that every file zarr store in nd
ends up having the following dimensions:
pr (time, lat, lon) float32 dask.array<shape=(432, 160, 320), chunksize=(432, 160, 320)
nd={'pr': {'1960': {'r1i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r1i1p1f1/Amon/pr/gn/'},
'r2i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r2i1p1f1/Amon/pr/gn/'},
'r3i1p1f1': {'gs://cmip6/CMIP/BCC/BCC-CSM2-MR/amip/r3i1p1f1/Amon/pr/gn/
What I am hoping on achieving after all join_new()
calls, is a dataset with:
pr (start_year, member_id, time, lat, lon) float32 dask.array<shape=(1, 3, 432, 160, 320), chunksize=(1, 1, 432, 160, 320)
where start_year=1960
and member_id=['r1i1p1f1', 'r2i1p1f1', 'r3i1p1f1']
I might be missing something here
But you are still assuming that you will "list" the directory contents to discover the files to open, correct?
Yes
Now I am realizing that this process might a little different for netcdf files where the time
axis is split across multiple netcdf files which appear as multiple entries in the csv catalog.
Let me revisit your approach and stop relying on expand_dims ( and use xarray.concat
explicity). I will get back to you if I have more questions
Now I am realizing that this process might a little different for netcdf files where the
time
axis is split across multiple netcdf files which appear as multiple entries in the csv catalog.
In this case, you would just have to include an explicit join, like
'time_granule': {'type': 'join_existing', 'options': {'dim': 'time'}}
I think this is really preferable to what you are currently doing in intake-esm, where time concatenation is implicit.
In the end, it will feel the same to the user (data is provided as a unified xarray dataset). But under the hood, the architecture will be much cleaner.
@rabernat, first of all, this is a very impressive bit of thinking on your part.
We have it working with real data and actual calls to xarray
. We have tested on Decadal Prediction runs, adding 2 new dims, and the vanilla CMIP data. It works!
The aggregation function have different signatures (may not be fully developed here).
import xarray as xr
def open_dataset(path, user_kwargs={'chunks': {'time': 36}}): # hardwired chunking for example
return xr.open_dataset(path, **user_kwargs)
def join_new(dsets, dim_name, coord_value, options={}):
concat_dim = xr.DataArray(coord_value, dims=(dim_name), name=dim_name)
return xr.concat(dsets, dim=concat_dim, **options)
def join_existing(dsets, options={}):
return xr.concat(dsets, dim='time')
def union(dsets, options={}):
return xr.merge(dsets, **options)
apply_aggregation
is modified as follows.
def apply_aggregation(v, agg_column=None, key=None, level=0):
"""Recursively descend into nested dictionary and aggregate items.
level tells how deep we are."""
assert level <= n_agg
if level == n_agg:
# bottom of the hierarchy - should be an actual path at this point
return open_dataset(v)
else:
agg_column = agg_columns[level]
agg_info = aggregation_dict[agg_column]
agg_type = agg_info['type']
if 'options' in agg_info:
agg_options = agg_info['options']
else:
agg_options = {}
dsets = [apply_aggregation(value, agg_column, key=key, level=level+1)
for key, value in v.items()]
keys = list(v.keys())
if agg_type == 'join_new':
return join_new(dsets, dim_name=agg_column, coord_value=keys, options=agg_options)
elif agg_type == 'join_existing':
return join_existing(dsets, options=agg_options)
elif agg_type == 'union':
return union(dsets, options=agg_options)
@andersy005 will clean this up and implement for real; hopefully we'll have it in intake-esm tonight or tomorrow.
This approach requires that the variable "time" is universally present in the collection; i.e., you couldn't mix datasets with variables named "Time" and "time" in the same collection. We could work around this (adding a time_coord_name
column, for instance), but could probably just live with multiple collections to handle that use case.
I am curious if there are any performance implications associated with the order of aggregation operations. I don't see any a priori reason for there to be, but maybe I am naive.
cc @dcherian
It should always be an option for an application to open the data assets (the rows of the CSV file) one-by-one. However, as discussed in the design document, we also may want to "suggest" how the assets can be merged / aggregated.
@andersy005 original had this in yaml.
I'm not sure exactly how the new syntax would look. We should try to leverage the vocabulary from ncml rather than inventing something new.
This issue is also discussed in https://github.com/pydata/xarray/issues/2697.