intake / intake-xarray

Intake plugin for xarray
https://intake-xarray.readthedocs.io/
BSD 2-Clause "Simplified" License
74 stars 36 forks source link

intake.open_zarr + multiple files from S3 #134

Closed Mikejmnez closed 1 year ago

Mikejmnez commented 1 year ago

I am interested in reading multiple zarr files form a .yaml file and opening them via intake (something like cat[].to_dask()). I found this link (https://discourse.pangeo.io/t/how-to-read-multiple-zarr-archives-at-once-from-s3/2564) very helpful when creating the dataset manually, but in my case I would like to do so from the intake catalog approach...

In the case of a single zarr file, I am able to create a yaml file which successfully opens a dataset. The .yaml entry looks like

  entry:
    description: sample (test) data fields
    args:
      engine: zarr
      storage_options:
        client_kwargs:
          endpoint_url: <url>
          region_name: "us-east-1"
        key: <aws_key>
        secret: <aws_secret_key>
      urlpath: 's3://bucket_name/file0'
    driver: intake_xarray.xzarr.ZarrSource

I tried passing a glob urlpath: 's3://bucket_name/file*', along with some additional parameters for xarray (like parallel=True, consolidated=True) but that didn't work.

The behavior (error I get) is similar to the case when intake.open_zarr is used to open a single zarr vs multiple zarr stores via a url defined as a glob. This is the following works

import xarray as xr
import s3fs
import intake

# =============
#           This works: a single file via intake
#==============

storage_options = {
'key': <key>,
'secret': <seret>,
'client_kwargs': {'endpoint_url':<url>, "region_name": "us-east-1"},
}
s3_path = 's3://bucket_name/file'
new_ds = intake.open_zarr(urlpath=s3_path, storage_options=storage_options).to_dask()

But, if s3_path is instead a glob like s3://bucket_name/file* referencing many files, I get the following:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
---> new_ds = intake.open_zarr(urlpath=s3_path, storage_options=storage_options).to_dask()

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/intake_xarray/base.py:69, in DataSourceMixin.to_dask(self)
     67 def to_dask(self):
     68     """Return xarray object where variables are dask arrays"""
---> 69     return self.read_chunked()

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/intake_xarray/base.py:44, in DataSourceMixin.read_chunked(self)
     42 def read_chunked(self):
     43     """Return xarray object (which will have chunks)"""
---> 44     self._load_metadata()
     45     return self._ds

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self)
    281 """load metadata only if needed"""
    282 if self._schema is None:
--> 283     self._schema = self._get_schema()
    284     self.dtype = self._schema.dtype
    285     self.shape = self._schema.shape

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/intake_xarray/base.py:18, in DataSourceMixin._get_schema(self)
     15 self.urlpath = self._get_cache(self.urlpath)[0]
     17 if self._ds is None:
---> 18     self._open_dataset()
     20     metadata = {
     21         'dims': dict(self._ds.dims),
     22         'data_vars': {k: list(self._ds[k].coords)
     23                       for k in self._ds.data_vars.keys()},
     24         'coords': tuple(self._ds.coords.keys()),
     25     }
     26     if getattr(self, 'on_server', False):

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/intake_xarray/xzarr.py:44, in ZarrSource._open_dataset(self)
     42     kw.setdefault("backend_kwargs", {})["storage_options"] = self.storage_options
     43 if isinstance(self.urlpath, list) or "*" in self.urlpath:
---> 44     self._ds = xr.open_mfdataset(self.urlpath, **kw)
     45 else:
     46     self._ds = xr.open_dataset(self.urlpath, **kw)

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/xarray/backends/api.py:1038, in open_mfdataset(paths, chunks, concat_dim, compat, preprocess, engine, data_vars, coords, combine, parallel, join, attrs_file, combine_attrs, **kwargs)
   1035     open_ = open_dataset
   1036     getattr_ = getattr
-> 1038 datasets = [open_(p, **open_kwargs) for p in paths]
   1039 closers = [getattr_(ds, "_close") for ds in datasets]
   1040 if preprocess is not None:

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/xarray/backends/api.py:1038, in <listcomp>(.0)
   1035     open_ = open_dataset
   1036     getattr_ = getattr
-> 1038 datasets = [open_(p, **open_kwargs) for p in paths]
   1039 closers = [getattr_(ds, "_close") for ds in datasets]
   1040 if preprocess is not None:

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/xarray/backends/api.py:566, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
    554 decoders = _resolve_decoders_kwargs(
    555     decode_cf,
    556     open_backend_dataset_parameters=backend.open_dataset_parameters,
   (...)
    562     decode_coords=decode_coords,
    563 )
    565 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 566 backend_ds = backend.open_dataset(
    567     filename_or_obj,
    568     drop_variables=drop_variables,
    569     **decoders,
    570     **kwargs,
    571 )
    572 ds = _dataset_from_backend_dataset(
    573     backend_ds,
    574     filename_or_obj,
   (...)
    584     **kwargs,
    585 )
    586 return ds

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/xarray/backends/zarr.py:934, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel, zarr_version)
    913 def open_dataset(  # type: ignore[override]  # allow LSP violation, not supporting **kwargs
    914     self,
    915     filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore,
   (...)
    931     zarr_version=None,
    932 ) -> Dataset:
    933     filename_or_obj = _normalize_path(filename_or_obj)
--> 934     store = ZarrStore.open_group(
    935         filename_or_obj,
    936         group=group,
    937         mode=mode,
    938         synchronizer=synchronizer,
    939         consolidated=consolidated,
    940         consolidate_on_close=False,
    941         chunk_store=chunk_store,
    942         storage_options=storage_options,
    943         stacklevel=stacklevel + 1,
    944         zarr_version=zarr_version,
    945     )
    947     store_entrypoint = StoreBackendEntrypoint()
    948     with close_on_error(store):

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/xarray/backends/zarr.py:452, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel, zarr_version)
    450     zarr_group = zarr.open_consolidated(store, **open_kwargs)
    451 else:
--> 452     zarr_group = zarr.open_group(store, **open_kwargs)
    453 return cls(
    454     zarr_group,
    455     mode,
   (...)
    459     safe_chunks,
    460 )

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/zarr/hierarchy.py:1416, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, meta_array)
   1365 """Open a group using file-mode-like semantics.
   1366 
   1367 Parameters
   (...)
   1412 
   1413 """
   1415 # handle polymorphic store arg
-> 1416 store = _normalize_store_arg(
   1417     store, storage_options=storage_options, mode=mode,
   1418     zarr_version=zarr_version)
   1419 if zarr_version is None:
   1420     zarr_version = getattr(store, '_store_version', DEFAULT_ZARR_VERSION)

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/zarr/hierarchy.py:1288, in _normalize_store_arg(store, storage_options, mode, zarr_version)
   1286 if store is None:
   1287     return MemoryStore() if zarr_version == 2 else MemoryStoreV3()
-> 1288 return normalize_store_arg(store,
   1289                            storage_options=storage_options, mode=mode,
   1290                            zarr_version=zarr_version)

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/zarr/storage.py:182, in normalize_store_arg(store, storage_options, mode, zarr_version)
    180 else:
    181     raise ValueError("zarr_version must be either 2 or 3")
--> 182 return normalize_store(store, storage_options, mode)

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/zarr/storage.py:146, in _normalize_store_arg_v2(store, storage_options, mode)
    144     import fsspec
    145     if isinstance(store, fsspec.FSMap):
--> 146         return FSStore(store.root,
    147                        fs=store.fs,
    148                        mode=mode,
    149                        check=store.check,
    150                        create=store.create,
    151                        missing_exceptions=store.missing_exceptions,
    152                        **(storage_options or {}))
    153 if isinstance(store, str):
    154     if "://" in store or "::" in store:

File ~/anaconda3/envs/Oceanography/lib/python3.10/site-packages/zarr/storage.py:1351, in FSStore.__init__(self, url, normalize_keys, key_separator, mode, exceptions, dimension_separator, fs, check, create, missing_exceptions, **storage_options)
   1349 else:
   1350     if storage_options:
-> 1351         raise ValueError("Cannot specify both fs and storage_options")
   1352     self.fs = fs
   1353     self.path = self.fs._strip_protocol(url)

ValueError: Cannot specify both fs and storage_options

Is there a work around this? Like I mentioned in the beginning, I would like to incorporate the correct arguments into an .yaml file enty to simply open multiple zarr files...

The package versions I am using are:

python:  3.10

intake.__version__
'0.7.0'

s3fs.__version__
'2023.6.0'

zarr.__version__
'2.15.0'

xr.__version
'2023.6.0'
martindurant commented 1 year ago

My memory fails on this one - you may need to pass your URL as a list of strings rather than a glob. One would think it should be simple enough to expand the glob path here if xarray doesn't.

Alternatively, since the error is specifically about passing storage_options, all of the arguments you have there could have been passed by environment variable instead (or fsspec config), so it's worth checking whether that works. That might not be sufficient workaround, since you don't want to ask other users of the catalog to have to configure their systems, but it might point to what is going wrong within xarray.

Mikejmnez commented 1 year ago

Thanks @martindurant . Passing a list of all entries worked. Something like

s3_paths = 's3://bucket_name/file*'
fileset = [f"s3://{filedb}" for filedb in s3_ceph.glob(s3_paths)]
new_ds = intake.open_zarr(urlpath=fileset, storage_options=storage_options, parallel=True, consolidated=True).to_dask()

with s3_ceph the associated s3fs.S3FileSystem to the storage_options....