intake / intake-esm

An intake plugin for parsing an Earth System Model (ESM) catalog and loading assets into xarray datasets.
https://intake-esm.readthedocs.io
Apache License 2.0
134 stars 44 forks source link

Issue loading data from AWS S3 with recent intake-esm #606

Closed aradhakrishnanGFDL closed 1 year ago

aradhakrishnanGFDL commented 1 year ago

I used the pangeo docker images pangeo/pangeo-notebook:2021.01.07 successfully to run an old notebook that uses intake-esm to access CMIP6 S3 netcdf data in AWS [open datasets, anon=true].

With the latest images, they don't work as expected and I tried to run stand alone tests with intake-esm. Hope the following helps in tracing the issue with the package or the usage

With intake-esm, version 2020.12.18 (And s3fs 2023.4.20), this notebook works as expected.

With 2023.4.20 and probably starting some version in 2022?, the notebook breaks here dset_dict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time': 1}},storage_options={'anon':True})

This snippet from the notebook may help reproduce the error:

import xarray as xr
import intake
import intake_esm

intake_esm.__version__   #does not work since 2022 version ’2023.4.20' , works  in ‘2020.12.18'

col_url = "https://cmip6-nc.s3.us-east-2.amazonaws.com/esgf-world.json" 

col = intake.open_esm_datastore(col_url)

expname_filter = ['historical']
table_id_filter = 'Amon'
model_filter = 'GFDL-ESM4'
variable_id_filter = "tas"
ens_filter = "r1i1p1f1"
version_filter = "v20190726"
cat = col.search(experiment_id=expname_filter, table_id=table_id_filter,source_id=model_filter,variable_id=variable_id_filter,version="v20190726")

dset_dict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time': 1}},storage_options={'anon':True})

ERRORs

--> The keys in the returned dictionary of datasets are constructed as follows: 'project.institution_id.source_id.experiment_id.table_id' /tmp/ipykernel_342/1303523811.py:1: DeprecationWarning: cdf_kwargs and zarr_kwargs are deprecated and will be removed in a future version. Please use xarray_open_kwargs instead. dset_dict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time': 1}},storage_options={'anon':True})  0.00% [0/1 00:00 980 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa 981 except cert_errors as exc: File /srv/conda/envs/notebook/lib/python3.10/asyncio/base_events.py:1060, in BaseEventLoop.create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout, happy_eyeballs_delay, interleave) 1059 try: -> 1060 sock = await self._connect_sock( 1061 exceptions, addrinfo, laddr_infos) 1062 break File /srv/conda/envs/notebook/lib/python3.10/asyncio/base_events.py:969, in BaseEventLoop._connect_sock(self, exceptions, addr_info, local_addr_infos) 968 raise OSError(f"no matching local address with {family=} found") --> 969 await self.sock_connect(sock, address) 970 return sock File /srv/conda/envs/notebook/lib/python3.10/asyncio/selector_events.py:501, in BaseSelectorEventLoop.sock_connect(self, sock, address) 500 try: --> 501 return await fut 502 finally: 503 # Needed to break cycles when an exception occurs. CancelledError: During handling of the above exception, another exception occurred: TimeoutError Traceback (most recent call last) File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiohttp/client.py:536, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize) 535 assert self._connector is not None --> 536 conn = await self._connector.connect( 537 req, traces=traces, timeout=real_timeout 538 ) 539 except asyncio.TimeoutError as exc: File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiohttp/connector.py:540, in BaseConnector.connect(self, req, traces, timeout) 539 try: --> 540 proto = await self._create_connection(req, traces, timeout) 541 if self._closed: File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiohttp/connector.py:901, in TCPConnector._create_connection(self, req, traces, timeout) 900 else: --> 901 _, proto = await self._create_direct_connection(req, traces, timeout) 903 return proto File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiohttp/connector.py:1175, in TCPConnector._create_direct_connection(self, req, traces, timeout, client_error) 1174 try: -> 1175 transp, proto = await self._wrap_create_connection( 1176 self._factory, 1177 host, 1178 port, 1179 timeout=timeout, 1180 ssl=sslcontext, 1181 family=hinfo["family"], 1182 proto=hinfo["proto"], 1183 flags=hinfo["flags"], 1184 server_hostname=hinfo["hostname"] if sslcontext else None, 1185 local_addr=self._local_addr, 1186 req=req, 1187 client_error=client_error, 1188 ) 1189 except ClientConnectorError as exc: File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiohttp/connector.py:979, in TCPConnector._wrap_create_connection(self, req, timeout, client_error, *args, **kwargs) 978 try: --> 979 async with ceil_timeout(timeout.sock_connect): 980 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa File /srv/conda/envs/notebook/lib/python3.10/site-packages/async_timeout/__init__.py:129, in Timeout.__aexit__(self, exc_type, exc_val, exc_tb) 123 async def __aexit__( 124 self, 125 exc_type: Optional[Type[BaseException]], 126 exc_val: Optional[BaseException], 127 exc_tb: Optional[TracebackType], 128 ) -> Optional[bool]: --> 129 self._do_exit(exc_type) 130 return None File /srv/conda/envs/notebook/lib/python3.10/site-packages/async_timeout/__init__.py:212, in Timeout._do_exit(self, exc_type) 211 self._timeout_handler = None --> 212 raise asyncio.TimeoutError 213 # timeout has not expired TimeoutError: The above exception was the direct cause of the following exception: ServerTimeoutError Traceback (most recent call last) File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/httpsession.py:202, in AIOHTTPSession.send(self, request) 201 url = URL(url, encoded=True) --> 202 response = await self._session.request( 203 request.method, 204 url=url, 205 headers=headers_, 206 data=data, 207 proxy=proxy_url, 208 proxy_headers=proxy_headers, 209 ) 211 http_response = aiobotocore.awsrequest.AioAWSResponse( 212 str(response.url), response.status, response.headers, response 213 ) File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiohttp/client.py:540, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx, read_bufsize) 539 except asyncio.TimeoutError as exc: --> 540 raise ServerTimeoutError( 541 "Connection timeout " "to host {}".format(url) 542 ) from exc 544 assert conn.transport is not None ServerTimeoutError: Connection timeout to host http://169.254.169.254/latest/api/token During handling of the above exception, another exception occurred: ConnectTimeoutError Traceback (most recent call last) File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:238, in ESMDataSource._open_dataset(self) 218 datasets = [ 219 _open_dataset( 220 record[self.path_column_name], (...) 235 for _, record in self.df.iterrows() 236 ] --> 238 datasets = dask.compute(*datasets) 239 if len(datasets) == 1: File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) 597 postcomputes.append(x.__dask_postcompute__()) --> 599 results = schedule(dsk, keys, **kwargs) 600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs) 87 pool = MultiprocessingPoolExecutor(pool) ---> 89 results = get_async( 90 pool.submit, 91 pool._max_workers, 92 dsk, 93 keys, 94 cache=cache, 95 get_id=_thread_get_id, 96 pack_exception=pack_exception, 97 **kwargs, 98 ) 100 # Cleanup pools associated to dead threads File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs) 510 else: --> 511 raise_exception(exc, tb) 512 res, worker_id = loads(res_info) File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb) 318 raise exc.with_traceback(tb) --> 319 raise exc File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 223 task, data = loads(task_info) --> 224 result = _execute_task(task, data) 225 id = get_id() File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk) 116 # Note: Don't assign the subtask results to a variable. numpy detects 117 # temporaries by their reference count and can execute certain 118 # operations in-place. --> 119 return func(*(_execute_task(a, cache) for a in args)) 120 elif not ishashable(arg): File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/utils.py:73, in apply(func, args, kwargs) 72 if kwargs: ---> 73 return func(*args, **kwargs) 74 else: File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:63, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format) 62 else: ---> 63 url = fsspec.open(urlpath, **storage_options).open() 65 # Handle multi-file datasets with `xr.open_mfdataset()` File /srv/conda/envs/notebook/lib/python3.10/site-packages/fsspec/core.py:135, in OpenFile.open(self) 129 """Materialise this as a real open file without context 130 131 The OpenFile object should be explicitly closed to avoid enclosed file 132 instances persisting. You must, therefore, keep a reference to the OpenFile 133 during the life of the file-like it generates. 134 """ --> 135 return self.__enter__() File /srv/conda/envs/notebook/lib/python3.10/site-packages/fsspec/core.py:103, in OpenFile.__enter__(self) 101 mode = self.mode.replace("t", "").replace("b", "") + "b" --> 103 f = self.fs.open(self.path, mode=mode) 105 self.fobjects = [f] File /srv/conda/envs/notebook/lib/python3.10/site-packages/fsspec/spec.py:1106, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs) 1105 ac = kwargs.pop("autocommit", not self._intrans) -> 1106 f = self._open( 1107 path, 1108 mode=mode, 1109 block_size=block_size, 1110 autocommit=ac, 1111 cache_options=cache_options, 1112 **kwargs, 1113 ) 1114 if compression is not None: File /srv/conda/envs/notebook/lib/python3.10/site-packages/s3fs/core.py:640, in S3FileSystem._open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, cache_options, **kwargs) 638 cache_type = self.default_cache_type --> 640 return S3File( 641 self, 642 path, 643 mode, 644 block_size=block_size, 645 acl=acl, 646 version_id=version_id, 647 fill_cache=fill_cache, 648 s3_additional_kwargs=kw, 649 cache_type=cache_type, 650 autocommit=autocommit, 651 requester_pays=requester_pays, 652 cache_options=cache_options, 653 ) File /srv/conda/envs/notebook/lib/python3.10/site-packages/s3fs/core.py:1989, in S3File.__init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays, cache_options) 1988 self.version_id = self.details.get("VersionId") -> 1989 super().__init__( 1990 s3, 1991 path, 1992 mode, 1993 block_size, 1994 autocommit=autocommit, 1995 cache_type=cache_type, 1996 cache_options=cache_options, 1997 ) 1998 self.s3 = self.fs # compatibility File /srv/conda/envs/notebook/lib/python3.10/site-packages/fsspec/spec.py:1462, in AbstractBufferedFile.__init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, size, **kwargs) 1461 else: -> 1462 self.size = self.details["size"] 1463 self.cache = caches[cache_type]( 1464 self.blocksize, self._fetch_range, self.size, **cache_options 1465 ) File /srv/conda/envs/notebook/lib/python3.10/site-packages/fsspec/spec.py:1475, in AbstractBufferedFile.details(self) 1474 if self._details is None: -> 1475 self._details = self.fs.info(self.path) 1476 return self._details File /srv/conda/envs/notebook/lib/python3.10/site-packages/fsspec/asyn.py:113, in sync_wrapper..wrapper(*args, **kwargs) 112 self = obj or args[0] --> 113 return sync(self.loop, func, *args, **kwargs) File /srv/conda/envs/notebook/lib/python3.10/site-packages/fsspec/asyn.py:98, in sync(loop, func, timeout, *args, **kwargs) 97 elif isinstance(return_result, BaseException): ---> 98 raise return_result 99 else: File /srv/conda/envs/notebook/lib/python3.10/site-packages/fsspec/asyn.py:53, in _runner(event, coro, result, timeout) 52 try: ---> 53 result[0] = await coro 54 except Exception as ex: File /srv/conda/envs/notebook/lib/python3.10/site-packages/s3fs/core.py:1210, in S3FileSystem._info(self, path, bucket, key, refresh, version_id) 1209 try: -> 1210 out = await self._call_s3( 1211 "head_object", 1212 self.kwargs, 1213 Bucket=bucket, 1214 Key=key, 1215 **version_id_kw(version_id), 1216 **self.req_kw, 1217 ) 1218 return { 1219 "ETag": out.get("ETag", ""), 1220 "LastModified": out["LastModified"], (...) 1226 "ContentType": out.get("ContentType"), 1227 } File /srv/conda/envs/notebook/lib/python3.10/site-packages/s3fs/core.py:332, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs) 331 async def _call_s3(self, method, *akwarglist, **kwargs): --> 332 await self.set_session() 333 s3 = await self.get_s3(kwargs.get("Bucket")) File /srv/conda/envs/notebook/lib/python3.10/site-packages/s3fs/core.py:508, in S3FileSystem.set_session(self, refresh, kwargs) 505 s3creator = self.session.create_client( 506 "s3", config=conf, **init_kwargs, **client_kwargs 507 ) --> 508 self._s3 = await s3creator.__aenter__() 510 self._s3creator = s3creator File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/session.py:26, in ClientCreatorContext.__aenter__(self) 25 async def __aenter__(self) -> AioBaseClient: ---> 26 self._client = await self._coro 27 return await self._client.__aenter__() File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/session.py:169, in AioSession._create_client(self, service_name, region_name, api_version, use_ssl, verify, endpoint_url, aws_access_key_id, aws_secret_access_key, aws_session_token, config) 168 else: --> 169 credentials = await self.get_credentials() 170 endpoint_resolver = self._get_internal_component('endpoint_resolver') File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/session.py:79, in AioSession.get_credentials(self) 78 if self._credentials is None: ---> 79 self._credentials = await ( 80 self._components.get_component( 81 'credential_provider' 82 ).load_credentials() 83 ) 84 return self._credentials File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/credentials.py:951, in AioCredentialResolver.load_credentials(self) 950 logger.debug("Looking for credentials via: %s", provider.METHOD) --> 951 creds = await provider.load() 952 if creds is not None: File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/credentials.py:537, in AioInstanceMetadataProvider.load(self) 536 fetcher = self._role_fetcher --> 537 metadata = await fetcher.retrieve_iam_role_credentials() 538 if not metadata: File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/utils.py:214, in AioInstanceMetadataFetcher.retrieve_iam_role_credentials(self) 213 try: --> 214 token = await self._fetch_metadata_token() 215 role_name = await self._get_iam_role(token) File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/utils.py:116, in AioIMDSFetcher._fetch_metadata_token(self) 115 try: --> 116 response = await session.send(request.prepare()) 117 if response.status_code == 200: File /srv/conda/envs/notebook/lib/python3.10/site-packages/aiobotocore/httpsession.py:238, in AIOHTTPSession.send(self, request) 237 if str(e).lower().startswith('connect'): --> 238 raise ConnectTimeoutError(endpoint_url=request.url, error=e) 239 else: ConnectTimeoutError: Connect timeout on endpoint URL: "http://169.254.169.254/latest/api/token" The above exception was the direct cause of the following exception: ESMDataSourceError Traceback (most recent call last) Cell In[8], line 1 ----> 1 dset_dict = cat.to_dataset_dict(cdf_kwargs={'chunks': {'time': 1}},storage_options={'anon':True}) File /srv/conda/envs/notebook/lib/python3.10/site-packages/pydantic/decorator.py:40, in pydantic.decorator.validate_arguments.validate.wrapper_function() File /srv/conda/envs/notebook/lib/python3.10/site-packages/pydantic/decorator.py:134, in pydantic.decorator.ValidatedFunction.call() File /srv/conda/envs/notebook/lib/python3.10/site-packages/pydantic/decorator.py:206, in pydantic.decorator.ValidatedFunction.execute() File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/core.py:662, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs) 660 except Exception as exc: 661 if not skip_on_error: --> 662 raise exc 663 self.datasets = self._create_derived_variables(datasets, skip_on_error) 664 return self.datasets File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/core.py:658, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs) 656 for task in gen: 657 try: --> 658 key, ds = task.result() 659 datasets[key] = ds 660 except Exception as exc: File /srv/conda/envs/notebook/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout) 449 raise CancelledError() 450 elif self._state == FINISHED: --> 451 return self.__get_result() 453 self._condition.wait(timeout) 455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: File /srv/conda/envs/notebook/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self) 401 if self._exception: 402 try: --> 403 raise self._exception 404 finally: 405 # Break a reference cycle with the exception in self._exception 406 self = None File /srv/conda/envs/notebook/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self) 55 return 57 try: ---> 58 result = self.fn(*self.args, **self.kwargs) 59 except BaseException as exc: 60 self.future.set_exception(exc) File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/core.py:800, in _load_source(key, source) 799 def _load_source(key, source): --> 800 return key, source.to_dask() File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:271, in ESMDataSource.to_dask(self) 269 def to_dask(self): 270 """Return xarray object (which will have chunks)""" --> 271 self._load_metadata() 272 return self._ds File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake/source/base.py:279, in DataSourceBase._load_metadata(self) 277 """load metadata only if needed""" 278 if self._schema is None: --> 279 self._schema = self._get_schema() 280 self.dtype = self._schema.dtype 281 self.shape = self._schema.shape File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:203, in ESMDataSource._get_schema(self) 201 def _get_schema(self) -> Schema: 202 if self._ds is None: --> 203 self._open_dataset() 204 metadata = {'dims': {}, 'data_vars': {}, 'coords': ()} 205 self._schema = Schema( 206 datashape=None, 207 dtype=None, (...) 210 extra_metadata=metadata, 211 ) File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:263, in ESMDataSource._open_dataset(self) 260 self._ds.attrs[OPTIONS['dataset_key']] = self.key 262 except Exception as exc: --> 263 raise ESMDataSourceError( 264 f"""Failed to load dataset with key='{self.key}' 265 You can use `cat['{self.key}'].df` to inspect the assets/files for this key. 266 """ 267 ) from exc ESMDataSourceError: Failed to load dataset with key='CMIP6.NOAA-GFDL.GFDL-ESM4.historical.Amon' You can use `cat['CMIP6.NOAA-GFDL.GFDL-ESM4.historical.Amon'].df` to inspect the assets/files for this key.
mgrover1 commented 1 year ago

@aradhakrishnanGFDL - I had a chance to look at this, and there was a bug in how we are passing storage_options to non-zarr formats (ex. netcdf4)

With the fix over in #609 , you should be able to read the dataset using the following snippet

dset_dict = cat.to_dataset_dict(xarray_open_kwargs={'engine':'h5netcdf',
                                                      'chunks':{}},
                                                      storage_options={'anon':True},
)

We will cut a release in the next few days with the fix once it is merged!

mgrover1 commented 1 year ago

Thank you for raising this issue! 🎉