zarr-developers / VirtualiZarr

Create virtual Zarr stores from archival data files using xarray syntax
https://virtualizarr.readthedocs.io/en/stable/api.html
Apache License 2.0
105 stars 22 forks source link

Real world use case: Virtualizarring CMIP6 data #93

Open jbusecke opened 6 months ago

jbusecke commented 6 months ago

[!NOTE] All of this is currently based on a dev branch which represents a merge of main into #67

Motivated to come up with a proof of concept until tomorrow for the ESGF conference I am at right now, I am trying to test Virtualizarr on real world CMIP6 data on s3 (a complex example for #61)

I am running the following:

from virtualizarr import open_virtual_dataset
import xarray as xr
files = [
    's3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_185001-186012.nc',
    's3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_187101-188012.nc',
]
vds_list = []
for f in files:
    vds = open_virtual_dataset(f, filetype='netCDF4',indexes={})
    vds_list.append(vds)
combined_vds = xr.combine_nested(vds_list, concat_dim=['time'], coords='minimal', compat='override')
combined_vds.virtualize.to_kerchunk('combined.json', format='json')

This works until here, which is really phenomenal. Thanks for the great work here.

image

But when I try to read from the reference file

import fsspec

fs = fsspec.filesystem("reference", fo=f"combined.json")
mapper = fs.get_mapper("")

combined_ds = xr.open_dataset(mapper, engine="kerchunk")

I get this error:

--------------------------------------------------------------------------- ClientError Traceback (most recent call last) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:113](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py#line=112), in _error_wrapper(func, args, kwargs, retries) 112 try: --> 113 return await func(*args, **kwargs) 114 except S3_RETRYABLE_ERRORS as e: File [/srv/conda/envs/notebook/lib/python3.11/site-packages/aiobotocore/client.py:408](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/aiobotocore/client.py#line=407), in AioBaseClient._make_api_call(self, operation_name, api_params) 407 error_class = self.exceptions.from_code(error_code) --> 408 raise error_class(parsed_response, operation_name) 409 else: ClientError: An error occurred (InvalidAccessKeyId) when calling the GetObject operation: The AWS Access Key Id you provided does not exist in our records. The above exception was the direct cause of the following exception: PermissionError Traceback (most recent call last) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py:245](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py#line=244), in _run_coros_in_chunks.._run_coro(coro, i) 244 try: --> 245 return await asyncio.wait_for(coro, timeout=timeout), i 246 except Exception as e: File [/srv/conda/envs/notebook/lib/python3.11/asyncio/tasks.py:452](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/asyncio/tasks.py#line=451), in wait_for(fut, timeout) 451 if timeout is None: --> 452 return await fut 454 if timeout <= 0: File [/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:1125](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py#line=1124), in S3FileSystem._cat_file(self, path, version_id, start, end) 1123 resp["Body"].close() -> 1125 return await _error_wrapper(_call_and_read, retries=self.retries) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:142](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py#line=141), in _error_wrapper(func, args, kwargs, retries) 141 err = translate_boto_error(err) --> 142 raise err File [/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:113](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py#line=112), in _error_wrapper(func, args, kwargs, retries) 112 try: --> 113 return await func(*args, **kwargs) 114 except S3_RETRYABLE_ERRORS as e: File [/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:1112](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py#line=1111), in S3FileSystem._cat_file.._call_and_read() 1111 async def _call_and_read(): -> 1112 resp = await self._call_s3( 1113 "get_object", 1114 Bucket=bucket, 1115 Key=key, 1116 **version_id_kw(version_id or vers), 1117 **head, 1118 **self.req_kw, 1119 ) 1120 try: File [/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:362](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py#line=361), in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs) 361 additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist, **kwargs) --> 362 return await _error_wrapper( 363 method, kwargs=additional_kwargs, retries=self.retries 364 ) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py:142](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/s3fs/core.py#line=141), in _error_wrapper(func, args, kwargs, retries) 141 err = translate_boto_error(err) --> 142 raise err PermissionError: The AWS Access Key Id you provided does not exist in our records. The above exception was the direct cause of the following exception: ReferenceNotReachable Traceback (most recent call last) Cell In[17], line 6 3 fs = fsspec.filesystem("reference", fo=f"combined.json", anon=True) 4 mapper = fs.get_mapper("") ----> 6 combined_ds = xr.open_dataset(mapper, engine="kerchunk") File [/srv/conda/envs/notebook/lib/python3.11/site-packages/xarray/backends/api.py:573](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/xarray/backends/api.py#line=572), in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs) 561 decoders = _resolve_decoders_kwargs( 562 decode_cf, 563 open_backend_dataset_parameters=backend.open_dataset_parameters, (...) 569 decode_coords=decode_coords, 570 ) 572 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None) --> 573 backend_ds = backend.open_dataset( 574 filename_or_obj, 575 drop_variables=drop_variables, 576 **decoders, 577 **kwargs, 578 ) 579 ds = _dataset_from_backend_dataset( 580 backend_ds, 581 filename_or_obj, (...) 591 **kwargs, 592 ) 593 return ds File [/srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py:17](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py#line=16), in KerchunkBackend.open_dataset(self, filename_or_obj, drop_variables, storage_options, open_dataset_options) 8 def open_dataset( 9 self, 10 filename_or_obj, (...) 14 open_dataset_options=None 15 ): ---> 17 ref_ds = open_reference_dataset( 18 filename_or_obj, 19 storage_options=storage_options, 20 open_dataset_options=open_dataset_options, 21 ) 22 if drop_variables is not None: 23 ref_ds = ref_ds.drop_vars(drop_variables) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py:51](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py#line=50), in open_reference_dataset(filename_or_obj, storage_options, open_dataset_options) 48 if open_dataset_options is None: 49 open_dataset_options = {} ---> 51 m = fsspec.get_mapper("reference://", fo=filename_or_obj, **storage_options) 53 return xr.open_dataset(m, engine="zarr", consolidated=False, **open_dataset_options) File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py:249, in get_mapper(url, check, create, missing_exceptions, alternate_root, **kwargs) 218 """Create key-value interface for given URL and options 219 220 The URL will be of the form "protocol://location" and point to the root (...) 246 ``FSMap`` instance, the dict-like key-value store. 247 """ 248 # Removing protocol here - could defer to each open() on the backend --> 249 fs, urlpath = url_to_fs(url, **kwargs) 250 root = alternate_root if alternate_root is not None else urlpath 251 return FSMap(root, fs, check, create, missing_exceptions=missing_exceptions) File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/core.py:395, in url_to_fs(url, **kwargs) 393 inkwargs["fo"] = urls 394 urlpath, protocol, _ = chain[0] --> 395 fs = filesystem(protocol, **inkwargs) 396 return fs, urlpath File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/registry.py:293](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/registry.py#line=292), in filesystem(protocol, **storage_options) 286 warnings.warn( 287 "The 'arrow_hdfs' protocol has been deprecated and will be " 288 "removed in the future. Specify it as 'hdfs'.", 289 DeprecationWarning, 290 ) 292 cls = get_filesystem_class(protocol) --> 293 return cls(**storage_options) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/spec.py:80](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/spec.py#line=79), in _Cached.__call__(cls, *args, **kwargs) 78 return cls._cache[token] 79 else: ---> 80 obj = super().__call__(*args, **kwargs) 81 # Setting _fs_token here causes some static linters to complain. 82 obj._fs_token_ = token File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py:713](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py#line=712), in ReferenceFileSystem.__init__(self, fo, target, ref_storage_args, target_protocol, target_options, remote_protocol, remote_options, fs, template_overrides, simple_templates, max_gap, max_block, cache_size, **kwargs) 709 self.fss[protocol] = fs 710 if remote_protocol is None: 711 # get single protocol from references 712 # TODO: warning here, since this can be very expensive? --> 713 for ref in self.references.values(): 714 if callable(ref): 715 ref = ref() File :880, in __iter__(self) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py:155](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py#line=154), in FSMap.__getitem__(self, key, default) 153 k = self._key_to_str(key) 154 try: --> 155 result = self.fs.cat(k) 156 except self.missing_exceptions: 157 if default is not None: File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py:917](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py#line=916), in ReferenceFileSystem.cat(self, path, recursive, on_error, **kwargs) 915 new_ex.__cause__ = ex 916 if on_error == "raise": --> 917 raise new_ex 918 elif on_error != "omit": 919 out[k] = new_ex ReferenceNotReachable: Reference "i[/0](https://leap.2i2c.cloud/0)" failed to fetch target ['s3://esgf-world/CMIP6/CMIP/CCCma/CanESM5/historical/r10i1p1f1/Omon/uo/gn/v20190429/uo_Omon_CanESM5_historical_r10i1p1f1_gn_185001-186012.nc', 47078, 1440]

To me this indicates that somehow the required storage_options={'anon':True} is not properly passed.

Adding

fs = fsspec.filesystem("reference", fo=f"combined.json", remote_options={'anon':True}) 

gets around that error but the opening never works. After waiting for 10 minutes I get this trace:

--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) File :6 File [/srv/conda/envs/notebook/lib/python3.11/site-packages/xarray/backends/api.py:573](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/xarray/backends/api.py#line=572), in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs) 561 decoders = _resolve_decoders_kwargs( 562 decode_cf, 563 open_backend_dataset_parameters=backend.open_dataset_parameters, (...) 569 decode_coords=decode_coords, 570 ) 572 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None) --> 573 backend_ds = backend.open_dataset( 574 filename_or_obj, 575 drop_variables=drop_variables, 576 **decoders, 577 **kwargs, 578 ) 579 ds = _dataset_from_backend_dataset( 580 backend_ds, 581 filename_or_obj, (...) 591 **kwargs, 592 ) 593 return ds File [/srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py:17](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py#line=16), in KerchunkBackend.open_dataset(self, filename_or_obj, drop_variables, storage_options, open_dataset_options) 8 def open_dataset( 9 self, 10 filename_or_obj, (...) 14 open_dataset_options=None 15 ): ---> 17 ref_ds = open_reference_dataset( 18 filename_or_obj, 19 storage_options=storage_options, 20 open_dataset_options=open_dataset_options, 21 ) 22 if drop_variables is not None: 23 ref_ds = ref_ds.drop_vars(drop_variables) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py:51](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/kerchunk/xarray_backend.py#line=50), in open_reference_dataset(filename_or_obj, storage_options, open_dataset_options) 48 if open_dataset_options is None: 49 open_dataset_options = {} ---> 51 m = fsspec.get_mapper("reference://", fo=filename_or_obj, **storage_options) 53 return xr.open_dataset(m, engine="zarr", consolidated=False, **open_dataset_options) File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py:249, in get_mapper(url, check, create, missing_exceptions, alternate_root, **kwargs) 218 """Create key-value interface for given URL and options 219 220 The URL will be of the form "protocol://location" and point to the root (...) 246 ``FSMap`` instance, the dict-like key-value store. 247 """ 248 # Removing protocol here - could defer to each open() on the backend --> 249 fs, urlpath = url_to_fs(url, **kwargs) 250 root = alternate_root if alternate_root is not None else urlpath 251 return FSMap(root, fs, check, create, missing_exceptions=missing_exceptions) File /srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/core.py:395, in url_to_fs(url, **kwargs) 393 inkwargs["fo"] = urls 394 urlpath, protocol, _ = chain[0] --> 395 fs = filesystem(protocol, **inkwargs) 396 return fs, urlpath File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/registry.py:293](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/registry.py#line=292), in filesystem(protocol, **storage_options) 286 warnings.warn( 287 "The 'arrow_hdfs' protocol has been deprecated and will be " 288 "removed in the future. Specify it as 'hdfs'.", 289 DeprecationWarning, 290 ) 292 cls = get_filesystem_class(protocol) --> 293 return cls(**storage_options) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/spec.py:80](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/spec.py#line=79), in _Cached.__call__(cls, *args, **kwargs) 78 return cls._cache[token] 79 else: ---> 80 obj = super().__call__(*args, **kwargs) 81 # Setting _fs_token here causes some static linters to complain. 82 obj._fs_token_ = token File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py:713](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py#line=712), in ReferenceFileSystem.__init__(self, fo, target, ref_storage_args, target_protocol, target_options, remote_protocol, remote_options, fs, template_overrides, simple_templates, max_gap, max_block, cache_size, **kwargs) 709 self.fss[protocol] = fs 710 if remote_protocol is None: 711 # get single protocol from references 712 # TODO: warning here, since this can be very expensive? --> 713 for ref in self.references.values(): 714 if callable(ref): 715 ref = ref() File :880, in __iter__(self) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py:155](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/mapping.py#line=154), in FSMap.__getitem__(self, key, default) 153 k = self._key_to_str(key) 154 try: --> 155 result = self.fs.cat(k) 156 except self.missing_exceptions: 157 if default is not None: File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py:892](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/implementations/reference.py#line=891), in ReferenceFileSystem.cat(self, path, recursive, on_error, **kwargs) 883 # merge and fetch consolidated ranges 884 new_paths, new_starts, new_ends = merge_offset_ranges( 885 list(urls2), 886 list(starts2), (...) 890 max_block=self.max_block, 891 ) --> 892 bytes_out = fs.cat_ranges(new_paths, new_starts, new_ends) 894 # unbundle from merged bytes - simple approach 895 for u, s, e, p in zip(urls, starts, ends, valid_paths): File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py:118](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py#line=117), in sync_wrapper..wrapper(*args, **kwargs) 115 @functools.wraps(func) 116 def wrapper(*args, **kwargs): 117 self = obj or args[0] --> 118 return sync(self.loop, func, *args, **kwargs) File [/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py:91](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/site-packages/fsspec/asyn.py#line=90), in sync(loop, func, timeout, *args, **kwargs) 88 asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) 89 while True: 90 # this loops allows thread to get interrupted ---> 91 if event.wait(1): 92 break 93 if timeout is not None: File [/srv/conda/envs/notebook/lib/python3.11/threading.py:629](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/threading.py#line=628), in Event.wait(self, timeout) 627 signaled = self._flag 628 if not signaled: --> 629 signaled = self._cond.wait(timeout) 630 return signaled File [/srv/conda/envs/notebook/lib/python3.11/threading.py:331](https://leap.2i2c.cloud/srv/conda/envs/notebook/lib/python3.11/threading.py#line=330), in Condition.wait(self, timeout) 329 else: 330 if timeout > 0: --> 331 gotit = waiter.acquire(True, timeout) 332 else: 333 gotit = waiter.acquire(False) KeyboardInterrupt:

I might be misinterpreting this but this looks exactly like the trace of the 'pangeo-forge-rechuning-stall' issue (can't find the original issue right now).

I am def too tired to dig deeper but I am wondering a few things:

Super happy to keep working on this!

jbusecke commented 6 months ago

Oh I got it! (this is from @mgrover1 s notebook)

ds = xr.open_dataset("reference://",
                     engine="zarr",
                     backend_kwargs={
                         "consolidated": False,
                         "storage_options": {
                             "fo": 'combined.json',
                             "remote_protocol": "s3",
                             "remote_options":{'anon':True},
                         },

                     }
                    )
ds

works brilliantly!

image

Honestly no clue what is happening here, but also not that important in the long term I guess hehe.

TomNicholas commented 6 months ago

To me this indicates that somehow the required storage_options={'anon':True} is not properly passed.

That might well be the case.

I actually forgot we hadn't merged #67 yet - it would be great to have that tested and merged.

Once this is written as a zarr, will the need to pass storage options go away?

Once it's written using the chunk manifest specification, and zarr-python implements the same ZEP, then it will be read from S3 however zarr-python implements it. Which I think will be using the rust object-store crate. I don't know anything about what options have to be passed to that.

Is there a way to not use fsspec to use the reference files at the moment?

You need fsspec to understand the reference files if they written out following the kerchunk format.

TomNicholas commented 6 months ago

@jbusecke nice! If that works but engine='kerchunk' doesn't work then presumably there is a bug with kerchunk's xarray backend...

I would double-check that you can load this data and that the values are as you expect (watch out for subtleties with encoding)...

jbusecke commented 6 months ago

Ok I tried this for loading:

from dask.diagnostics import ProgressBar
import xarray as xr

ds = xr.open_dataset("reference://",
                     engine="zarr",
                     chunks={},
                     backend_kwargs={
                         "consolidated": False,
                         "storage_options": {
                             "fo": 'combined.json',
                             "remote_protocol": "s3",
                             "remote_options":{'anon':True},
                         },

                     }
                    )
with ProgressBar():
    da_plot = ds.uo.mean(['time', 'lev']).load()
da_plot.plot()

image

That seems totally fine to me on the loading side.

Do you have recommendations how to check the encoding in a comprehensive manner?

TomNicholas commented 6 months ago

That seems totally fine to me on the loading side.

Great!

Do you have recommendations how to check the encoding in a comprehensive manner?

Well the source of truth here would be to open the files directly using xarray without using kerchunk, i.e. open_mfdataset on the raw netCDFs.

norlandrhagen commented 6 months ago

Very cool to see a real world example @jbusecke!