Closed observingClouds closed 1 year ago
Repeating the same code after clearing the SLK_CACHE leads to different results for the first retrieval:
slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/showcase/dataset.zarr/air","$max_depth":1}},{"resources.name":{"$regex":"0.0.0|0.0.1"}}]}'
slk search '{"$and":[{"path":{"$gte":"/arch/mh0010/m300408/showcase/dataset.zarr/air","$max_depth":1}},{"resources.name":{"$regex":"0.0.0|0.0.1|0.1.0"}}]}'
My assumption is that the queue is too quickly processed so that not all requests made it into the queue in time. Adding some wait time before starting the retrieval would help to fill the queue sufficiently.
I did some digging and found that what you're trying to achieve is impossible (at list with the current implementation of fsspec and zarr):
When executing
import xarray as xr
ds = xr.open_mfdataset(
"slk:///arch/mh0010/m300408/showcase/dataset.zarr",
engine="zarr",
)
and printing the stack trace in the _retrieve_items
method I get the following:
slkspec/core.py:158 for frame_info in inspect.stack():
slkspec/core.py:188 self._retrieve_items(items)
slkspec/core.py:233 self._cache_files()
slkspec/core.py:397 return f_obj.read()
fsspec/spec.py:811 return self.cat_file(paths[0], **kwargs)
fsspec/mapping.py:137 result = self.fs.cat(k)
zarr/storage.py:724 return self._mutable_mapping[key]
zarr/core.py:2009 cdata = self.chunk_store[ckey]
zarr/core.py:1284 self._chunk_getitem(
zarr/core.py:991 return self._get_selection(indexer=indexer, out=out, fields=fields)
zarr/core.py:947 return self._get_basic_selection_nd(
zarr/core.py:820 result = self.get_basic_selection(pure_selection, fields=fields)
xarray/backends/zarr.py:73 return array[key.tuple]
xarray/core/indexing.py:524 return np.asarray(array[self.key], dtype=None)
xarray/coding/times.py:268 num_dates = np.asarray(num_dates)
xarray/coding/variables.py:72 return self.func(self.array)
xarray/core/indexing.py:524 return np.asarray(array[self.key], dtype=None)
xarray/core/indexes.py:177 index = pd.Index(np.asarray(array), **kwargs)
xarray/core/indexing.py:1420 self.array = safe_cast_to_index(array)
xarray/core/variable.py:2844 self._data = PandasIndexingAdapter(self._data)
xarray/core/variable.py:624 return IndexVariable(
xarray/core/variable.py:168 obj = obj.to_index_variable()
xarray/core/merge.py:370 variable = as_variable(variable, name=name)
xarray/core/merge.py:762 collected = collect_variables_and_indexes(aligned, indexes=indexes)
xarray/core/merge.py:582 return merge_core(
xarray/core/dataset.py:604 variables, coord_names, dims, indexes, _ = merge_data_and_coords(
xarray/backends/store.py:41 ds = Dataset(vars, attrs=attrs)
xarray/backends/zarr.py:879 ds = store_entrypoint.open_dataset(
xarray/backends/api.py:537 backend_ds = backend.open_dataset(
xarray/backends/api.py:993 datasets = [open_(p, **open_kwargs) for p in paths]
xarray/backends/api.py:993 datasets = [open_(p, **open_kwargs) for p in paths]
What that means is that zarr is reading the data serially, that is first the dataset.zarr/.zmetadata
and then all the files it needs (dataset.zarr/time/0
, dataset.zarr/lat/0
, dataset.zarr/lon/0
). There's is very little we can do in order to prevent this.
So I'd be inclined to say we close this issue unless you know what sort of return value slkspec/core.py:397 return f_obj.read()
we could assign in order for this to work.
Well, you might be right with the .zmetadata
, but
preffs
protocol in addition, we can reduce the .zmetadata
issue a bit by putting the metadata and coordinates into one container (e.g. tar). In this case we only need one request to retrieve all information to be able to provide the user with a lazy dataset.Well, you might be right with the
.zmetadata
, but* how can you explain the behaviour mentioned [further above](https://github.com/observingClouds/slkspec/issues/10#issuecomment-1342360228)?
Ok, here is what I think is going on.
.zmetadata
file and reads its content to see what variables are there and where the actual data can be found as well as how it is organized. zarr/storage.py:2861 meta = json_loads(self.store[metadata_key])
zarr/convenience.py:1300 meta_store = ConsolidatedStoreClass(store, metadata_key=metadata_key)
xarray/backends/zarr.py:865 store = ZarrStore.open_group(`
once this is done, zarr reads the data file by file. See the stack trace above. AFAIK this can happen serial or in parallel - it is beyond my comprehension how zarr decides when data can be read in parallel and when not. What I found out was that the data storage object in the example above doesn't support reading in parallel (KVStorage
object). If the data is read serially the delay will never have an effect, because each request will wait until previous ones have been processed.
If the data is read in parallel, and I assume this is what you observed, the queue gets filled sort of randomly. File names might get added to the queue before previous threads continue downloading after the sleep. I think this is what you observed with the dask distributed clusters. So in short your "solution" is creating race conditions. I don't really think we want to do that.
* when we use the `preffs` protocol in addition, we can reduce the `.zmetadata` issue a bit by putting the metadata and coordinates into one container (e.g. tar). In this case we only need one request to retrieve all information to be able to provide the user with a lazy dataset.
I have too little knowledge to tell anything about your file format.
* This issue emphasises once more the importance that small files should be kept in the cache (or saved somewhere else).
Absolutely, @neumannd we should maybe add this our requirement list?
That said, I think I might have an idea of how we could circumvent the problem altogether. Let me create a PR to see what you think.
After consulting the zarr docs I think the only appropriate solution to this problem is not saving the data in a DirectoryStorage
object.
This example would work well with the tape archive:
import xarray as xr
import zarr
with zarr.ZipStore('example.zip', mode='w') as zarr:
dset = xr.tutorial.open_dataset("air_temperature")
dset.to_zarr(store)
This avoids saving lots of small files on the tape, and I think should be used to save zarr format on the hsml.
The example I gave here is just a minimal working example that shows that the request is not correctly combined without the delay. I can give you an example with preffs
, but that is just more complicated. The zarr.ZipStore
creates one big file, which might look nice at first, but a simulation can be easily several TB in size and a user might only look at a portion of it and does not want to retrieve one huge file from tape, that potentially doesn't even fit on /scratch
. We could split the zarr file by variable for example to reduce the size, but then we end up again with the same issue that we want to retrieve several files at once. The metadata would by the way also be included in the ZipStore such that no lazy loading is possible.
You, @antarcticrainforest, mentioned concerns about a race condition, but if I see this correctly, a delay should not change anything about this. All processes are delayed by the same amount. The delay ensures that all processes could add their requests into the queue and whichever is first (after the delay), will process all requests in the queue and bundle them into one slk request. The other processes realise that the queue is empty and will return. I don't see where this is problematic in particular because it works!
Ok, if you think a delay will work then go for it.
In the following example, the first file request is not combined with other requests, although all files being in the same directory.