lsterzinger / 2022-esip-kerchunk-tutorial

MIT License
18 stars 6 forks source link

to_dask failed with s3fs issue #7

Closed ZihengSun closed 1 year ago

ZihengSun commented 1 year ago

Description

Try to run the notebook nwd_hindcast_intake and stuck at the step ds = cat['nwm-reanalysis'].to_dask() with the following error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[5], line 1
----> 1 ds = cat['nwm-reanalysis'].to_dask()

File ~/miniconda3/lib/python3.11/site-packages/intake_xarray/base.py:69, in DataSourceMixin.to_dask(self)
     67 def to_dask(self):
     68     """Return xarray object where variables are dask arrays"""
---> 69     return self.read_chunked()

File ~/miniconda3/lib/python3.11/site-packages/intake_xarray/base.py:44, in DataSourceMixin.read_chunked(self)
     42 def read_chunked(self):
     43     """Return xarray object (which will have chunks)"""
---> 44     self._load_metadata()
     45     return self._ds

File ~/miniconda3/lib/python3.11/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self)
    281 """load metadata only if needed"""
    282 if self._schema is None:
--> 283     self._schema = self._get_schema()
    284     self.dtype = self._schema.dtype
    285     self.shape = self._schema.shape

File ~/miniconda3/lib/python3.11/site-packages/intake_xarray/base.py:18, in DataSourceMixin._get_schema(self)
     15 self.urlpath = self._get_cache(self.urlpath)[0]
     17 if self._ds is None:
---> 18     self._open_dataset()
     20     metadata = {
     21         'dims': dict(self._ds.dims),
     22         'data_vars': {k: list(self._ds[k].coords)
     23                       for k in self._ds.data_vars.keys()},
     24         'coords': tuple(self._ds.coords.keys()),
     25     }
     26     if getattr(self, 'on_server', False):

File ~/miniconda3/lib/python3.11/site-packages/intake_xarray/xzarr.py:46, in ZarrSource._open_dataset(self)
     44     self._ds = xr.open_mfdataset(self.urlpath, **kw)
     45 else:
---> 46     self._ds = xr.open_dataset(self.urlpath, **kw)

File ~/miniconda3/lib/python3.11/site-packages/xarray/backends/api.py:570, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
    558 decoders = _resolve_decoders_kwargs(
    559     decode_cf,
    560     open_backend_dataset_parameters=backend.open_dataset_parameters,
   (...)
    566     decode_coords=decode_coords,
    567 )
    569 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 570 backend_ds = backend.open_dataset(
    571     filename_or_obj,
    572     drop_variables=drop_variables,
    573     **decoders,
    574     **kwargs,
    575 )
    576 ds = _dataset_from_backend_dataset(
    577     backend_ds,
    578     filename_or_obj,
   (...)
    588     **kwargs,
    589 )
    590 return ds

File ~/miniconda3/lib/python3.11/site-packages/xarray/backends/zarr.py:934, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel, zarr_version)
    913 def open_dataset(  # type: ignore[override]  # allow LSP violation, not supporting **kwargs
    914     self,
    915     filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore,
   (...)
    931     zarr_version=None,
    932 ) -> Dataset:
    933     filename_or_obj = _normalize_path(filename_or_obj)
--> 934     store = ZarrStore.open_group(
    935         filename_or_obj,
    936         group=group,
    937         mode=mode,
    938         synchronizer=synchronizer,
    939         consolidated=consolidated,
    940         consolidate_on_close=False,
    941         chunk_store=chunk_store,
    942         storage_options=storage_options,
    943         stacklevel=stacklevel + 1,
    944         zarr_version=zarr_version,
    945     )
    947     store_entrypoint = StoreBackendEntrypoint()
    948     with close_on_error(store):

File ~/miniconda3/lib/python3.11/site-packages/xarray/backends/zarr.py:452, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel, zarr_version)
    450     zarr_group = zarr.open_consolidated(store, **open_kwargs)
    451 else:
--> 452     zarr_group = zarr.open_group(store, **open_kwargs)
    453 return cls(
    454     zarr_group,
    455     mode,
   (...)
    459     safe_chunks,
    460 )

File ~/miniconda3/lib/python3.11/site-packages/zarr/hierarchy.py:1416, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, meta_array)
   1365 """Open a group using file-mode-like semantics.
   1366 
   1367 Parameters
   (...)
   1412 
   1413 """
   1415 # handle polymorphic store arg
-> 1416 store = _normalize_store_arg(
   1417     store, storage_options=storage_options, mode=mode,
   1418     zarr_version=zarr_version)
   1419 if zarr_version is None:
   1420     zarr_version = getattr(store, '_store_version', DEFAULT_ZARR_VERSION)

File ~/miniconda3/lib/python3.11/site-packages/zarr/hierarchy.py:1288, in _normalize_store_arg(store, storage_options, mode, zarr_version)
   1286 if store is None:
   1287     return MemoryStore() if zarr_version == 2 else MemoryStoreV3()
-> 1288 return normalize_store_arg(store,
   1289                            storage_options=storage_options, mode=mode,
   1290                            zarr_version=zarr_version)

File ~/miniconda3/lib/python3.11/site-packages/zarr/storage.py:182, in normalize_store_arg(store, storage_options, mode, zarr_version)
    180 else:
    181     raise ValueError("zarr_version must be either 2 or 3")
--> 182 return normalize_store(store, storage_options, mode)

File ~/miniconda3/lib/python3.11/site-packages/zarr/storage.py:155, in _normalize_store_arg_v2(store, storage_options, mode)
    153 if isinstance(store, str):
    154     if "://" in store or "::" in store:
--> 155         return FSStore(store, mode=mode, **(storage_options or {}))
    156     elif storage_options:
    157         raise ValueError("storage_options passed with non-fsspec path")

File ~/miniconda3/lib/python3.11/site-packages/zarr/storage.py:1346, in FSStore.__init__(self, url, normalize_keys, key_separator, mode, exceptions, dimension_separator, fs, check, create, missing_exceptions, **storage_options)
   1344 if protocol in (None, "file") and not storage_options.get("auto_mkdir"):
   1345     storage_options["auto_mkdir"] = True
-> 1346 self.map = fsspec.get_mapper(url, **{**mapper_options, **storage_options})
   1347 self.fs = self.map.fs  # for direct operations
   1348 self.path = self.fs._strip_protocol(url)

File ~/miniconda3/lib/python3.11/site-packages/fsspec/mapping.py:237, in get_mapper(url, check, create, missing_exceptions, alternate_root, **kwargs)
    206 """Create key-value interface for given URL and options
    207 
    208 The URL will be of the form "protocol://location" and point to the root
   (...)
    234 ``FSMap`` instance, the dict-like key-value store.
    235 """
    236 # Removing protocol here - could defer to each open() on the backend
--> 237 fs, urlpath = url_to_fs(url, **kwargs)
    238 root = alternate_root if alternate_root is not None else urlpath
    239 return FSMap(root, fs, check, create, missing_exceptions=missing_exceptions)

File ~/miniconda3/lib/python3.11/site-packages/fsspec/core.py:375, in url_to_fs(url, **kwargs)
    373     inkwargs["fo"] = urls
    374 urlpath, protocol, _ = chain[0]
--> 375 fs = filesystem(protocol, **inkwargs)
    376 return fs, urlpath

File ~/miniconda3/lib/python3.11/site-packages/fsspec/registry.py:267, in filesystem(protocol, **storage_options)
    260     warnings.warn(
    261         "The 'arrow_hdfs' protocol has been deprecated and will be "
    262         "removed in the future. Specify it as 'hdfs'.",
    263         DeprecationWarning,
    264     )
    266 cls = get_filesystem_class(protocol)
--> 267 return cls(**storage_options)

File ~/miniconda3/lib/python3.11/site-packages/fsspec/spec.py:79, in _Cached.__call__(cls, *args, **kwargs)
     77     return cls._cache[token]
     78 else:
---> 79     obj = super().__call__(*args, **kwargs)
     80     # Setting _fs_token here causes some static linters to complain.
     81     obj._fs_token_ = token

File ~/miniconda3/lib/python3.11/site-packages/fsspec/implementations/reference.py:606, in ReferenceFileSystem.__init__(self, fo, target, ref_storage_args, target_protocol, target_options, remote_protocol, remote_options, fs, template_overrides, simple_templates, max_gap, max_block, cache_size, **kwargs)
    603     else:
    604         # Lazy parquet refs
    605         logger.info("Open lazy reference dict from URL %s", fo)
--> 606         self.references = LazyReferenceMapper(
    607             fo2,
    608             fs=ref_fs,
    609             cache_size=cache_size,
    610         )
    611 else:
    612     # dictionaries
    613     self._process_references(fo, template_overrides)

File ~/miniconda3/lib/python3.11/site-packages/fsspec/implementations/reference.py:120, in LazyReferenceMapper.__init__(self, root, fs, out_root, cache_size, categorical_threshold)
    118 self.dirs = None
    119 self.fs = fsspec.filesystem("file") if fs is None else fs
--> 120 with self.fs.open("/".join([self.root, ".zmetadata"]), "rb") as f:
    121     self._items[".zmetadata"] = f.read()
    122 met = json.loads(self._items[".zmetadata"])

File ~/miniconda3/lib/python3.11/site-packages/fsspec/spec.py:1241, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1239 else:
   1240     ac = kwargs.pop("autocommit", not self._intrans)
-> 1241     f = self._open(
   1242         path,
   1243         mode=mode,
   1244         block_size=block_size,
   1245         autocommit=ac,
   1246         cache_options=cache_options,
   1247         **kwargs,
   1248     )
   1249     if compression is not None:
   1250         from fsspec.compression import compr

File ~/miniconda3/lib/python3.11/site-packages/s3fs/core.py:659, in S3FileSystem._open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, cache_options, **kwargs)
    656 if cache_type is None:
    657     cache_type = self.default_cache_type
--> 659 return S3File(
    660     self,
    661     path,
    662     mode,
    663     block_size=block_size,
    664     acl=acl,
    665     version_id=version_id,
    666     fill_cache=fill_cache,
    667     s3_additional_kwargs=kw,
    668     cache_type=cache_type,
    669     autocommit=autocommit,
    670     requester_pays=requester_pays,
    671     cache_options=cache_options,
    672 )

File ~/miniconda3/lib/python3.11/site-packages/s3fs/core.py:2066, in S3File.__init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays, cache_options)
   2064         self.details = s3.info(path)
   2065         self.version_id = self.details.get("VersionId")
-> 2066 super().__init__(
   2067     s3,
   2068     path,
   2069     mode,
   2070     block_size,
   2071     autocommit=autocommit,
   2072     cache_type=cache_type,
   2073     cache_options=cache_options,
   2074 )
   2075 self.s3 = self.fs  # compatibility
   2077 # when not using autocommit we want to have transactional state to manage

File ~/miniconda3/lib/python3.11/site-packages/fsspec/spec.py:1597, in AbstractBufferedFile.__init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs)
   1595         self.size = size
   1596     else:
-> 1597         self.size = self.details["size"]
   1598     self.cache = caches[cache_type](
   1599         self.blocksize, self._fetch_range, self.size, **cache_options
   1600     )
   1601 else:

File ~/miniconda3/lib/python3.11/site-packages/fsspec/spec.py:1610, in AbstractBufferedFile.details(self)
   1607 @property
   1608 def details(self):
   1609     if self._details is None:
-> 1610         self._details = self.fs.info(self.path)
   1611     return self._details

File ~/miniconda3/lib/python3.11/site-packages/fsspec/asyn.py:121, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    118 @functools.wraps(func)
    119 def wrapper(*args, **kwargs):
    120     self = obj or args[0]
--> 121     return sync(self.loop, func, *args, **kwargs)

File ~/miniconda3/lib/python3.11/site-packages/fsspec/asyn.py:106, in sync(loop, func, timeout, *args, **kwargs)
    104     raise FSTimeoutError from return_result
    105 elif isinstance(return_result, BaseException):
--> 106     raise return_result
    107 else:
    108     return return_result

File ~/miniconda3/lib/python3.11/site-packages/fsspec/asyn.py:61, in _runner(event, coro, result, timeout)
     59     coro = asyncio.wait_for(coro, timeout=timeout)
     60 try:
---> 61     result[0] = await coro
     62 except Exception as ex:
     63     result[0] = ex

File ~/miniconda3/lib/python3.11/site-packages/s3fs/core.py:1271, in S3FileSystem._info(self, path, bucket, key, refresh, version_id)
   1269 if key:
   1270     try:
-> 1271         out = await self._call_s3(
   1272             "head_object",
   1273             self.kwargs,
   1274             Bucket=bucket,
   1275             Key=key,
   1276             **version_id_kw(version_id),
   1277             **self.req_kw,
   1278         )
   1279         return {
   1280             "ETag": out.get("ETag", ""),
   1281             "LastModified": out["LastModified"],
   (...)
   1287             "ContentType": out.get("ContentType"),
   1288         }
   1289     except FileNotFoundError:

File ~/miniconda3/lib/python3.11/site-packages/s3fs/core.py:341, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
    340 async def _call_s3(self, method, *akwarglist, **kwargs):
--> 341     await self.set_session()
    342     s3 = await self.get_s3(kwargs.get("Bucket"))
    343     method = getattr(s3, method)

File ~/miniconda3/lib/python3.11/site-packages/s3fs/core.py:502, in S3FileSystem.set_session(self, refresh, kwargs)
    500 conf = AioConfig(**config_kwargs)
    501 if self.session is None:
--> 502     self.session = aiobotocore.session.AioSession(**self.kwargs)
    504 for parameters in (config_kwargs, self.kwargs, init_kwargs, client_kwargs):
    505     for option in ("region_name", "endpoint_url"):

TypeError: AioSession.__init__() got an unexpected keyword argument 'compression'

Urgency Level

No rush.

Hunch

Some incompatibility between s3fs and fsspec?

rsignell-usgs commented 1 year ago

@ZihengSun , thanks for reporting this -- I can reproduce.

The intake file specifies compression thusly:

sources:
  nwm-reanalysis:
    driver: intake_xarray.xzarr.ZarrSource
    description: 'National Water Model Reanalysis, version 2.1'
    args:
      urlpath: 'reference://'
      storage_options:
        simple_templates: True
        target_options:
          anon: true
          compression: 'zstd'
        target_protocol: s3
        fo: 's3://esip-qhub-public/noaa/nwm/nwm_reanalysis.json.zst'
        remote_options:
          anon: true
        remote_protocol: s3

@lsterzinger or @martindurant, did something change with the specification of the compression argument?

martindurant commented 1 year ago

I seem to recall that this was reported previously and fixed on fsspec main - what exact version do you have?

lsterzinger commented 1 year ago

Thanks @ZihengSun for reporting this, sorry I didn't respond earlier - I was on vacation last week.

I'm reproducing this issue with intake==0.7 and fsspec==2023.6.0, both of which are the latest release versions. Going to try rolling some of these back to see if I can pin a version and get the environment working (it could use an update anyways) as a workaround.

martindurant commented 1 year ago

https://github.com/fsspec/filesystem_spec/pull/1273 was supposed to fix this, and was before the most recent release

martindurant commented 1 year ago

Please try with https://github.com/fsspec/filesystem_spec/pull/1316

lsterzinger commented 1 year ago

Worked for me @martindurant - when this makes it into a release I'll pin it in this repo's environment. Thanks for fixing this so quickly!

ZihengSun commented 1 year ago

Awesome! Thanks everyone!