intake / intake-esm

An intake plugin for parsing an Earth System Model (ESM) catalog and loading assets into xarray datasets.
https://intake-esm.readthedocs.io
Apache License 2.0
135 stars 44 forks source link

Is it possible to use an esm datastore with OPeNDAP urls? #175

Closed naomi-henderson closed 4 years ago

naomi-henderson commented 4 years ago

Here at LDEO, we have an OPeNDAP server on one of our local machines with CMIP6 Amon data and would like remote users (students in a class, for example) to be able to use intake-esm with this data. You can see the collection here:

col = intake.open_esm_datastore("http://haden.ldeo.columbia.edu/catalogs/hyrax_cmip6.json")

Has anyone been using OPeNDAP urls in their collections? Any words of wisdom?

It looks like we are really close to getting this to work (searches are fine, etc), but to_dataset_dict is having trouble with fsspec.mapping.FSMap.

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-22-a88136df92d2> in <module>
----> 1 dset_dict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time': 36}})

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/core.py in to_dataset_dict(self, zarr_kwargs, cdf_kwargs, preprocess, aggregate)
    341         self.preprocess = preprocess
    342 
--> 343         return self.to_dask()
    344 
    345     def _get_schema(self):

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_xarray/base.py in to_dask(self)
     67     def to_dask(self):
     68         """Return xarray object where variables are dask arrays"""
---> 69         return self.read_chunked()
     70 
     71     def close(self):

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_xarray/base.py in read_chunked(self)
     42     def read_chunked(self):
     43         """Return xarray object (which will have chunks)"""
---> 44         self._load_metadata()
     45         return self._ds
     46 

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
    115         """load metadata only if needed"""
    116         if self._schema is None:
--> 117             self._schema = self._get_schema()
    118             self.datashape = self._schema.datashape
    119             self.dtype = self._schema.dtype

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/core.py in _get_schema(self)
    346         from intake.source.base import Schema
    347 
--> 348         self._open_dataset()
    349         self._schema = Schema(
    350             datashape=None, dtype=None, shape=None, npartitions=None, extra_metadata={}

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/core.py in _open_dataset(self)
    429         ]
    430 
--> 431         dsets = dask.compute(*dsets)
    432 
    433         self._ds = {group_id: ds for (group_id, ds) in dsets}

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk, keys, **kwargs)
    437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    438 

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     78         get_id=_thread_get_id,
     79         pack_exception=pack_exception,
---> 80         **kwargs
     81     )
     82 

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/dask/local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/core.py in _load_group_dataset(key, df, col_data, agg_columns, aggregation_dict, path_column_name, variable_column_name, use_format_column, mapper_dict, zarr_kwargs, cdf_kwargs, preprocess)
    506         zarr_kwargs,
    507         cdf_kwargs,
--> 508         preprocess,
    509     )
    510 

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in _aggregate(aggregation_dict, agg_columns, n_agg, v, lookup, mapper_dict, zarr_kwargs, cdf_kwargs, preprocess)
    124             return ds
    125 
--> 126     return apply_aggregation(v)
    127 
    128 

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in apply_aggregation(v, agg_column, key, level)
     87             dsets = [
     88                 apply_aggregation(value, agg_column, key=key, level=level + 1)
---> 89                 for key, value in v.items()
     90             ]
     91             keys = list(v.keys())

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in <listcomp>(.0)
     87             dsets = [
     88                 apply_aggregation(value, agg_column, key=key, level=level + 1)
---> 89                 for key, value in v.items()
     90             ]
     91             keys = list(v.keys())

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in apply_aggregation(v, agg_column, key, level)
     87             dsets = [
     88                 apply_aggregation(value, agg_column, key=key, level=level + 1)
---> 89                 for key, value in v.items()
     90             ]
     91             keys = list(v.keys())

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in <listcomp>(.0)
     87             dsets = [
     88                 apply_aggregation(value, agg_column, key=key, level=level + 1)
---> 89                 for key, value in v.items()
     90             ]
     91             keys = list(v.keys())

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in apply_aggregation(v, agg_column, key, level)
     87             dsets = [
     88                 apply_aggregation(value, agg_column, key=key, level=level + 1)
---> 89                 for key, value in v.items()
     90             ]
     91             keys = list(v.keys())

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in <listcomp>(.0)
     87             dsets = [
     88                 apply_aggregation(value, agg_column, key=key, level=level + 1)
---> 89                 for key, value in v.items()
     90             ]
     91             keys = list(v.keys())

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in apply_aggregation(v, agg_column, key, level)
     71                 zarr_kwargs=zarr_kwargs,
     72                 cdf_kwargs=cdf_kwargs,
---> 73                 preprocess=preprocess,
     74             )
     75 

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/intake_esm/merge_util.py in _open_asset(path, varname, data_format, zarr_kwargs, cdf_kwargs, preprocess)
    134     else:
    135         print(path)
--> 136         ds = xr.open_dataset(path, **cdf_kwargs)
    137 
    138     if preprocess is None:

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime)
    524                 "with engine='scipy' or 'h5netcdf'"
    525             )
--> 526         engine = _get_engine_from_magic_number(filename_or_obj)
    527         if engine == "scipy":
    528             store = backends.ScipyDataStore(filename_or_obj, **backend_kwargs)

/usr/local/python/anaconda3/envs/pangeo-Oct2019/lib/python3.7/site-packages/xarray/backends/api.py in _get_engine_from_magic_number(filename_or_obj)
    112         magic_number = filename_or_obj[:8]
    113     else:
--> 114         if filename_or_obj.tell() != 0:
    115             raise ValueError(
    116                 "file-like object read/write pointer not at zero "

AttributeError: 'FSMap' object has no attribute 'tell'
andersy005 commented 4 years ago

@naomi-henderson, I just fixed this in #176 (the way we were manipulating the path was causing confusion to xarray).

In [1]:  import intake                                                                                                         

In [2]: col = intake.open_esm_datastore("http://haden.ldeo.columbia.edu/catalogs/hyrax_cmip6.json")  

In [4]: cat = col.search(source_id="CAMS-CSM1-0", experiment_id="historical", member_id="r1i1p1f1", table_id="Amon", grid_label
   ...: ="gn", version="v1")  

In [5]: cat.df                                                                                                                 
Out[5]: 
  activity_id  ...                                               path
0        CMIP  ...  /m2/haibo/CMIP6mon/CMIP/CAMS/CAMS-CSM1-0/histo...
1        CMIP  ...  /m2/haibo/CMIP6mon/CMIP/CAMS/CAMS-CSM1-0/histo...

[2 rows x 13 columns]

In [6]: dsets = cat.to_dataset_dict(cdf_kwargs = {"chunks": {"time": 36}})                                                     
--> The keys in the returned dictionary of datasets are constructed as follows:
        'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'

--> There will be 1 group(s)

In [7]: dsets                                                                                                                  
Out[7]: 
{'CMIP.CAMS.CAMS-CSM1-0.historical.Amon.gn': <xarray.Dataset>
 Dimensions:    (bnds: 2, lat: 160, lon: 320, member_id: 1, time: 1980)
 Coordinates:
   * lon        (lon) float64 0.0 1.125 2.25 3.375 ... 355.5 356.6 357.8 358.9
   * time       (time) object 1850-01-16 12:00:00 ... 2014-12-16 12:00:00
   * lat        (lat) float64 -89.14 -88.03 -86.91 -85.79 ... 86.91 88.03 89.14
   * member_id  (member_id) <U8 'r1i1p1f1'
 Dimensions without coordinates: bnds
 Data variables:
     lat_bnds   (time, lat, bnds) float64 dask.array<chunksize=(1980, 160, 2), meta=np.ndarray>
     lon_bnds   (time, lon, bnds) float64 dask.array<chunksize=(1980, 320, 2), meta=np.ndarray>
     time_bnds  (time, bnds) object dask.array<chunksize=(36, 2), meta=np.ndarray>
     ps         (member_id, time, lat, lon) float32 dask.array<chunksize=(1, 36, 160, 320), meta=np.ndarray>
     ts         (member_id, time, lat, lon) float32 dask.array<chunksize=(1, 36, 160, 320), meta=np.ndarray>
 Attributes:
     DODS_EXTRA.Unlimited_Dimension:  time
     Conventions:                     CF-1.7 CMIP-6.2
     external_variables:              areacella
     grid_label:                      gn
     mip_era:                         CMIP6
     data_specs_version:              01.00.30
     institution_id:                  CAMS
     cmor_version:                    3.4.0
     variant_label:                   r1i1p1f1
     parent_variant_label:            r1i1p1f1
     parent_source_id:                CAMS-CSM1-0
     branch_method:                   Standard
     run_variant:                     forcing: greenhouse gases,aerosol,solar ...
     comment:                         The model integration starts from the pi...
     branch_time_in_child:            0.0
     nominal_resolution:              100 km
     product:                         model-output
     activity_id:                     CMIP
     parent_experiment_id:            piControl
     history:                         2019-06-11T11:01:52Z ;rewrote data to be...
     source_id:                       CAMS-CSM1-0
     title:                           CAMS-CSM1-0 output prepared for CMIP6
     branch_time_in_parent:           3025.0
     tracking_id:                     hdl:21.14100/5dc697e7-4e9e-4c16-838e-c01...
     realm:                           atmos
     experiment_id:                   historical
     parent_activity_id:              CMIP
     experiment:                      all-forcing simulation of the recent past
     grid:                            T106
     further_info_url:                https://furtherinfo.es-doc.org/CMIP6.CAM...
     contact:                         Dr. Xinyao Rong (rongxy@cma.gov.cn)
     source:                          CAMS_CSM 1.0 (2016): \naerosol: none\nat...
     table_info:                      Creation Date:(30 April 2019) MD5:cc2ae5...
     physics_index:                   1
     references:                      Model described by Rong et al (J. Meteor...
     table_id:                        Amon
     realization_index:               1
     forcing_index:                   1
     parent_time_units:               days since 1850-01-01
     description:                     DECK: historical
     source_type:                     AOGCM
     institution:                     Chinese Academy of Meteorological Scienc...
     frequency:                       mon
     initialization_index:            1
     sub_experiment_id:               none
     parent_mip_era:                  CMIP6
     license:                         CMIP6 model data produced by Lawrence Li...
     sub_experiment:                  none}
naomi-henderson commented 4 years ago

Fantastic! Many thanks. I will close this issue

andersy005 commented 4 years ago

You are welcome, and thank you for bringing attention to this issue