fsspec / filesystem_spec

A specification that python filesystems should adhere to.
BSD 3-Clause "New" or "Revised" License
1.03k stars 360 forks source link

reference file cache #910

Open martindurant opened 2 years ago

martindurant commented 2 years ago

cc @rjzamora

Following the success of caching pieces of files for parquet reading, I wonder if we can think of a use case combining this with the kerchunk method. I am thinking, that each piece of a file like might be fetched from different sources - proactively (as with parquet) or on demand.

I am thinking that this idea might allow us to construct arrow-like datasets from any data format (including parquet) where the data itself is some compressed buffer that translates directly to an arrow (or numpy) buffer on load. Exactly how to do this is another matter, but I thought people might have some insight on whether this kind of thing might even be useful, if it were done.

emfdavid commented 1 year ago

Hi Martin - not sure if this is what you are asking about above but I think it is related.

I have been trying to use caching with kerchunk reference files systems to HRRR Grib2 data. I can see the reference files system is putting my CachingFileSystem in the fss attribute, but it does not appear to use the cache - no files appear in my cache directory after loading a variable.

Here is what I have tried with debug on, but I am not sure where to look next after reading the ReferenceFileSystem and the CacheFileSystem implementation. I just don't know enough about how xarray uses the KV api to read this data. Why doesn't the cat method call produce a cached output?

rpath = 'gs://gcp-public-data-weather.camus.store/high-resolution-rapid-refresh/version_2/raw_zarr/conus/hrrr.20210603/hrrr.t00z.wrfsfcf02.zarr'
fsc = fsspec.filesystem("blockcache", target_protocol='gcs', target_options={'anon': True}, cache_storage='/home/builder/cache')
t_opts = {}
fs = fsspec.filesystem(
    protocol="reference", 
    fo=rpath,  # The target zarr json blob
    target_protocol="gcs",
    target_options=t_opts,
    #remote_protocol='gcs', 
    #remote_options=r_opts,
    fs=dict(gcs=fsc)
)
fs.fss

2022-11-23T16:38:38.464Z MainProcess MainThread INFO:fsspec.reference:Read reference from URL gs://gcp-public-data-weather.camus.store/high-resolution-rapid-refresh/version_2/raw_zarr/conus/hrrr.20210603/hrrr.t00z.wrfsfcf02.zarr
{'gcs': <fsspec.implementations.cached.CachingFileSystem at 0x7f9a80345180>}
kv_store = fs.get_mapper("")
ds = xr.open_dataset(kv_store, engine="zarr", backend_kwargs=dict(consolidated=False), chunks={'valid_time':1}, drop_variables=["heightAboveGround"])
ds

2022-11-23T16:38:42.420Z MainProcess MainThread DEBUG:fsspec.reference:cat: .zgroup
2022-11-23T16:38:42.422Z MainProcess MainThread DEBUG:fsspec.reference:Reference: .zgroup, type bytes
2022-11-23T16:38:42.423Z MainProcess MainThread DEBUG:fsspec.reference:cat: .zarray
2022-11-23T16:38:42.424Z MainProcess MainThread DEBUG:fsspec.reference:cat: .zgroup
2022-11-23T16:38:42.425Z MainProcess MainThread DEBUG:fsspec.reference:Reference: .zgroup, type bytes
2022-11-23T16:38:42.427Z MainProcess MainThread DEBUG:fsspec.reference:cat: .zattrs/.zarray
2022-11-23T16:38:42.428Z MainProcess MainThread DEBUG:fsspec.reference:cat: .zgroup/.zarray
2022-11-23T16:38:42.429Z MainProcess MainThread DEBUG:fsspec.reference:cat: 10u/.zarray
2022-11-23T16:38:42.429Z MainProcess MainThread DEBUG:fsspec.reference:Reference: 10u/.zarray, type bytes
2022-11-23T16:38:42.430Z MainProcess MainThread DEBUG:fsspec.reference:cat: 10u/.zarray
...
2022-11-23T16:38:43.276Z MainProcess MainThread DEBUG:fsspec.reference:Reference: valid_time/.zarray, type bytes
2022-11-23T16:38:43.277Z MainProcess MainThread DEBUG:fsspec.reference:cat: valid_time/.zarray
2022-11-23T16:38:43.277Z MainProcess MainThread DEBUG:fsspec.reference:Reference: valid_time/.zarray, type bytes
2022-11-23T16:38:43.279Z MainProcess MainThread DEBUG:fsspec.reference:cat: valid_time/0
2022-11-23T16:38:43.279Z MainProcess MainThread DEBUG:fsspec.reference:Reference: valid_time/0, type bytes
xarray.Dataset
Dimensions:
valid_time: 1x: 1059y: 1799step: 1surface: 1time: 1
Coordinates:
step (step) timedelta64[ns] 02:00:00
surface(surface) int64 0
time (time) datetime64[ns] 2021-06-03
valid_time (valid_time) datetime64[ns] 2021-06-03T02:00:00
Data variables: (48)
Attributes:
centre : kwbc
centreDescription : US National Weather Service - NCEP 
edition : 2
subCentre : 0
%time ds.dswrf.compute()

2022-11-23T16:39:00.338Z MainProcess ThreadPoolExecutor-0_0 DEBUG:fsspec.reference:cat: dswrf/.zarray
2022-11-23T16:39:00.341Z MainProcess ThreadPoolExecutor-0_0 DEBUG:fsspec.reference:Reference: dswrf/.zarray, type bytes
2022-11-23T16:39:00.342Z MainProcess ThreadPoolExecutor-0_0 DEBUG:fsspec.reference:cat: dswrf/.zarray
2022-11-23T16:39:00.343Z MainProcess ThreadPoolExecutor-0_0 DEBUG:fsspec.reference:Reference: dswrf/.zarray, type bytes
2022-11-23T16:39:00.346Z MainProcess fsspecIO DEBUG:fsspec.reference:cat: dswrf/0.0.0
2022-11-23T16:39:00.346Z MainProcess fsspecIO DEBUG:fsspec.reference:Reference: dswrf/0.0.0, offset 89193093, size 840472
CPU times: user 54.2 ms, sys: 51 ms, total: 105 ms
Wall time: 178 ms
xarray.DataArray'dswrf'valid_time: 1x: 1059y: 1799
array([[[  5. ,   5.4,   6.1, ...,   0. ,   0. ,   0. ],
        [  5.8,  24.8,  25. , ...,   0. ,   0. ,   0. ],
        [  6.7,  28.4,  29.3, ...,   0. ,   0. ,   0. ],
        ...,
        [373.2, 379.6, 385.7, ...,   0. ,   0. ,   0. ],
        [344.1, 378.9, 373.4, ...,   0. ,   0. ,   0. ],
        [379.9, 361.9, 376.2, ...,   0. ,   0. ,   0. ]]])
Coordinates:
valid_time
(valid_time)
datetime64[ns]
2021-06-03T02:00:00
Attributes: (18)

When I cat a grib2 file directly using the caching filesystem object it does use the cache.

fo = fsc.open("gs://high-resolution-rapid-refresh/hrrr.20221118/conus/hrrr.t00z.wrfsfcf18.grib2")
fo.read(1000)

2022-11-23T17:49:45.267Z MainProcess MainThread DEBUG:fsspec:Creating local sparse file for high-resolution-rapid-refresh/hrrr.20221118/conus/hrrr.t00z.wrfsfcf18.grib2
b'GRIB\x00\x00\x00\x02\x00\x00\x00\x00\x00\x07\xeb\x11\x00\x00\x00\x15\x01\x00\x07\x00\x00\x02\x01\x01\x07\xe6\x0b\x12\x00\x00\x00\x00\x01\x00\x00\x00Q\x03\x00\x00\x1d\x11\xf5\x00\x00\x00\x1e\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x07\x00\x00\x04#\x01B\x8a\xcb\x0e$\x9c\xd8\x08\x02Kv\xa0\x0f\xa5n\xa0\x00-\xc6\xc0\x00-...'

dd = fsc.cat("gs://high-resolution-rapid-refresh/hrrr.20221118/conus/hrrr.t00z.wrfsfcf18.grib2")

2022-11-23T17:50:21.439Z MainProcess MainThread DEBUG:fsspec:Opening partially cached copy of high-resolution-rapid-refresh/hrrr.20221118/conus/hrrr.t00z.wrfsfcf18.grib2
2022-11-23T17:50:29.994Z MainProcess MainThread DEBUG:fsspec:going to save
2022-11-23T17:50:29.998Z MainProcess MainThread DEBUG:fsspec:saved
martindurant commented 1 year ago

I don't seem to have access to the file at rpath.

I am not entirely sure what's going on here, but some kind of mismatch between the intended target of the caching FSs (file-like access) and the optimisations in gcsfs/async to do direct reads from the target. I would be happy to find out.

I think I would recommend having your caching the other way around: simplecache layered on top of referenceFS on top of gcsfs, so that each target chunk becomes a locally cached file, rather than using the sparse file magic of blockcache. The only downside is duplicating the metadata fragments already downloaded with the reference set, but those are small anyway.

emfdavid commented 1 year ago

For my purposes, it would be ideal to cache just the byte ranges (or sparse blocks) for the specific variables that I use in the remote Grib2 HRRR files and not the zarr aggregation which is updated. Checking metadata to force a refresh should prevent the cached aggregation from getting stale, but I would like to avoid checking metadata on the grib2 ranges since they should never change once written to gcs://high-resolution-rapid-refresh

Based on your suggestion I tried this:

rpath = 'gs://gcp-public-data-weather.camus.store/high-resolution-rapid-refresh/version_2/raw_zarr/conus/hrrr.20210603/hrrr.t00z.wrfsfcf02.zarr'
r_opts = {'anon':True}
t_opts = {}
fs = fsspec.filesystem(
    protocol="reference", 
    fo=rpath,  # The target zarr json blob
    target_protocol="gcs",
    target_options=t_opts,
    remote_protocol='gcs', 
    remote_options=r_opts,
)
fsc = fsspec.filesystem("simplecache", cache_storage='/home/builder/cache', fs=fs)
kv_store = fsc.get_mapper("")
ds = xr.open_dataset(kv_store, engine="zarr", backend_kwargs=dict(consolidated=False), chunks={'valid_time':1}, drop_variables=["heightAboveGround"])
ds

I have tried blockcache, simplecache and filecache this way...

Block Cache fails with:

2022-11-23T19:23:04.047Z MainProcess MainThread DEBUG:fsspec:Opening partially cached copy of .zgroup
2022-11-23T19:23:04.050Z MainProcess MainThread DEBUG:fsspec:going to save
2022-11-23T19:23:04.052Z MainProcess MainThread DEBUG:fsspec:saved
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
Input In [18], in <cell line: 2>()
      1 kv_store = fsc.get_mapper("")
----> 2 ds = xr.open_dataset(kv_store, engine="zarr", backend_kwargs=dict(consolidated=False), chunks={'valid_time':1}, drop_variables=["heightAboveGround"])
      3 ds
...
File ~/.cache/bazel/_bazel_builder/99bd544777c52d5a0d8a66ea3b05626a/execroot/ritta/bazel-out/k8-dbg/bin/forecasting/notebook.runfiles/common_deps_fsspec/site-packages/fsspec/caching.py:101, in MMapCache._fetch(self, start, end)
     99     send = min(sstart + self.blocksize, self.size)
    100     logger.debug(f"MMap get block #{i} ({sstart}-{send}")
--> 101     self.cache[sstart:send] = self.fetcher(sstart, send)
    102     self.blocks.add(i)
    104 return self.cache[start:end]

File ~/.cache/bazel/_bazel_builder/99bd544777c52d5a0d8a66ea3b05626a/execroot/ritta/bazel-out/k8-dbg/bin/forecasting/notebook.runfiles/common_deps_fsspec/site-packages/fsspec/spec.py:1556, in AbstractBufferedFile._fetch_range(self, start, end)
   1554 def _fetch_range(self, start, end):
   1555     """Get the specified set of bytes from remote"""
-> 1556     raise NotImplementedError

Simple Cache and File Cache both seem to fail with the same issue

---------------------------------------------------------------------------
JSONDecodeError                           Traceback (most recent call last)
Input In [20], in <cell line: 2>()
      1 kv_store = fsc.get_mapper("")
----> 2 ds = xr.open_dataset(kv_store, engine="zarr", backend_kwargs=dict(consolidated=False), chunks={'valid_time':1}, drop_variables=["heightAboveGround"])
      3 ds

File ~/.cache/bazel/_bazel_builder/99bd544777c52d5a0d8a66ea3b05626a/execroot/ritta/bazel-out/k8-dbg/bin/forecasting/notebook.runfiles/common_deps_xarray/site-packages/xarray/backends/api.py:495, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, backend_kwargs, *args, **kwargs)
    483 decoders = _resolve_decoders_kwargs(
    484     decode_cf,
    485     open_backend_dataset_parameters=backend.open_dataset_parameters,
   (...)
    491     decode_coords=decode_coords,
    492 )
    494 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 495 backend_ds = backend.open_dataset(
    496     filename_or_obj,
    497     drop_variables=drop_variables,
    498     **decoders,
    499     **kwargs,
    500 )
    501 ds = _dataset_from_backend_dataset(
    502     backend_ds,
    503     filename_or_obj,
   (...)
    510     **kwargs,
    511 )
    512 return ds
...
File ~/.cache/bazel/_bazel_builder/99bd544777c52d5a0d8a66ea3b05626a/external/python3_10_x86_64-unknown-linux-gnu/lib/python3.10/json/decoder.py:337, in JSONDecoder.decode(self, s, _w)
    332 def decode(self, s, _w=WHITESPACE.match):
    333     """Return the Python representation of ``s`` (a ``str`` instance
    334     containing a JSON document).
    335 
    336     """
--> 337     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
    338     end = _w(s, end).end()
    339     if end != len(s):

File ~/.cache/bazel/_bazel_builder/99bd544777c52d5a0d8a66ea3b05626a/external/python3_10_x86_64-unknown-linux-gnu/lib/python3.10/json/decoder.py:355, in JSONDecoder.raw_decode(self, s, idx)
    353     obj, end = self.scan_once(s, idx)
    354 except StopIteration as err:
--> 355     raise JSONDecodeError("Expecting value", s, err.value) from None
    356 return obj, end

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Unfortunately I can not make that GCS bucket public at this time, but it is a scan_grib / Multizarr output created by combining scans of the HRRR WRFSFCF output.

input_path = "gcs://high-resolution-rapid-refresh/hrrr.20221118/conus/hrrr.t00z.wrfsfcf18.grib2"
SCAN_SURFACE_INSTANT_GRIB = dict(
    filter=dict(typeOfLevel="surface", stepType="instant"),
    storage_options=dict(token=None),
)

SCAN_HEIGHT_ABOVE_GROUND_INSTANT_GRIB = dict(
    filter=dict(typeOfLevel="heightAboveGround", stepType="instant"),
    storage_options=dict(token=None),
)

# The scan method produces a list of entries
zarr_meta_surface = scan_grib(input_path, **SCAN_SURFACE_INSTANT_GRIB)
zarr_meta_height_above_ground = scan_grib(input_path, **SCAN_HEIGHT_ABOVE_GROUND_INSTANT_GRIB)

combined_zarr_meta = MultiZarrToZarr(
      zarr_meta_surface + zarr_meta_height_above_ground,
      remote_protocol="gcs",
      remote_options=dict(token=None),
      concat_dims=["valid_time"],
      identical_dims=["latitude", "longitude", "step"],
  ).translate()

I hope to open source the tools for building the running aggregations of HRRR output we are creating using Kerchunk and share them with NOAA and IOOS folks that might be interested in using them.

martindurant commented 1 year ago

https://github.com/fsspec/filesystem_spec/pull/1123 appears to solve the case of the cacher on the top level - please test it for your use. I won't have a chance to look into the inner blockcache case today.

emfdavid commented 1 year ago

Thank you for your support. I tried it... I now have

fsspec.__version__

'2021.06.0+523.gc685f7b'

But I still get the same errors. Is there something I can do to get more debug information that might help?

martindurant commented 1 year ago

I updated the git tags in may fork, which my help. I did:

import fsspec
import xarray as xr
fs = fsspec.filesystem(
    protocol="reference",
    fo="hrrr.gribs.json",  # The target zarr json blob
    remote_protocol='gcs',
    remote_options=r_opts,
)
r_opts = {"token": "anon"}
fs = fsspec.filesystem(
    protocol="reference",
    fo="hrrr.gribs.json",  # local version I made
    remote_protocol='gcs',
    remote_options=r_opts,
)
fsc = fsspec.filesystem("simplecache", fs=fs, cache_storage="./cache")
kv_store = fsc.get_mapper("")
ds = xr.open_dataset(kv_store, engine="zarr", backend_kwargs=dict(consolidated=False), chunks={'valid_time':1}, drop_variables=["heightAboveGround"])
ds.v[:].values
emfdavid commented 1 year ago

I spent an hour getting errors to find out that I had bad data in my cache directory. Looks like this work now - thank you @martindurant

emfdavid commented 1 year ago

Thank you again for your quick work on Wednesday last week @martindurant! Very excited to have this improvement which cuts our ML job time in half.

Is there an architectural issue with using Block Cache with the remote FS? I would be happy to contribute on that if you think it is workable.

martindurant commented 1 year ago

Is there an architectural issue with using Block Cache with the remote FS

I am not certain. The primary use of that cache mechanism is for file-like access (i.e., open, seek, read), but here we are doing a cat operation of potentially multiple parts of a remote file at once. It wouldn't surprise me if it needed a little finesse to get it to work.

emfdavid commented 1 year ago

Hmmm - okay. Well, I will see if I can find the right balance of finesse and brute force to make it work. In the mean time, can we merge #1123 - I would like to be able to reference a stable commit in the main branch and then I can use it in production.

emfdavid commented 1 year ago

Found my cache is getting stale on disk as I reopen the reference file system with new parts added - new HRRR grib files added each hour. To fix the issue for now, I am to include the checksum of the reference file(s) in the cache path so that it gets invalidated every time new grib files are added to the reference filesystem created by kerchunk. I would like to be able to reuse the cache, the previous references should not change, but maybe the way parts (reference file system files) are cached is not stable? Is this something I can roll up my sleeves to try and fix or am I headed for a dead end?

martindurant commented 1 year ago

Perhaps the following options of the caching filesystems would help?

        check_files: bool
            Whether to explicitly see if the UID of the remote file matches
            the stored one before using. Warning: some file systems such as
            HTTP cannot reliably give a unique hash of the contents of some
            path, so be sure to set this option to False.
        expiry_time: int
            The time in seconds after which a local copy is considered useless.
            Set to falsy to prevent expiry. The default is equivalent to one
            week.

The former is only available for "filecache" as opposed to "simplecache", since it needs extra complexity to store the details of the files.