pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
118 stars 54 forks source link

Grib / Kerchunk recipe? #267

Open rabernat opened 2 years ago

rabernat commented 2 years ago

The new ECMWF forecast dataset in Azure is a perfect chance for us develop a close but not quite functional type of recipe: indexing and combing a bunch of grib files with Kerchunk.

The current HDFReferenceRecipe class almost does what we need; it just needs to be generalized to permit grib inputs. The kerchunk HRRR example shows how to open grib with kerchunk.

Here is some code to kick things off. cc @cisaacstern, @lsterzinger, @martindurant, @TomAugspurger

from kerchunk.grib2 import scan_grib
from fsspec.implementations.reference import ReferenceFileSystem
import zarr
import xarray as xr

ROOT = 'https://ai4edataeuwest.blob.core.windows.net/ecmwf/'
path = '20220122/18z/0p4-beta/scda/20220122180000-36h-scda-fc.grib2'
url = ROOT + path

storage_options = {}
common = ['time', 'latitude', 'longitude']
%time scan = scan_grib(url, common, storage_options)

rfs = ReferenceFileSystem(scan)
zg = zarr.open_group(rfs.get_mapper(""))
zg.info

%time _ = zg['q'][:]

import xarray as xr
ds = xr.open_zarr(rfs.get_mapper(""), consolidated=False)
print(ds)
<xarray.Dataset>
Dimensions:    (latitude: 451, longitude: 900)
Coordinates:
  * latitude   (latitude) float64 90.0 89.6 89.2 88.8 ... -89.2 -89.6 -90.0
  * longitude  (longitude) float64 -180.0 -179.6 -179.2 ... 178.8 179.2 179.6
Data variables: (12/18)
    d          (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    gh         (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    msl        (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    q          (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    r          (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    skt        (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    ...         ...
    u          (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    u10        (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    unknown    (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    v          (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    v10        (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
    vo         (latitude, longitude) float32 dask.array<chunksize=(451, 900), meta=np.ndarray>
Attributes:
    Conventions:             CF-1.7
    GRIB_centre:             ecmf
    GRIB_centreDescription:  European Centre for Medium-Range Weather Forecasts
    GRIB_edition:            2
    GRIB_subCentre:          0
    history:                 2022-01-26T04:08 GRIB to CDM+CF via cfgrib-0.9.9...
    institution:             European Centre for Medium-Range Weather Forecasts
TomAugspurger commented 2 years ago

Just an FYI, that storage container currently has a lifecycle policy to remove files older than 30-ish days. We're still trying to figure out how best to handle these operational forecast datasets. If anyone has thoughts on that, I'd be happy to hear them.

martindurant commented 2 years ago

I would just like to mention that GRIB2 support is not as good as hdf: it still needs cfgrib installed and works by copying pieces of the target file to local temporary files. It should be possible to eliminate that and get direct access to the binary chunks, and so better performance, but I have not had the time to try myself and was hoping someone would emerge with grib expertise who can help ( @DPeterK :) ).

rabernat commented 2 years ago

Perhaps we want to define a recipe that will create a kerchunk-virtual-zarr that always points to the "latest" data, whatever that means. Kind of similar to the ideas explored in this met office blog post. By running this recipe daily, we will ensure that we are never pointing at data that doesn't exist.

rabernat commented 2 years ago

@martindurant - the code above worked quite well out of the box! It took about 20s to index the file, and I didn't even run it from inside Azure. Whether or not a local copies are happening may not be so important here.

martindurant commented 2 years ago

Good to know!

TomAugspurger commented 2 years ago

@martindurant this ECMWF dataset includes "index files". Has anyone checked if those have sufficient information to translate into the index filesystem format needed for a reference filesystem?

https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.index is one example, for the file at https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.grib2

rabernat commented 2 years ago

Here is what the index file looks like

{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "29", "param": "2t", "_offset": 0, "_length": 609069}
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "27", "param": "10u", "_offset": 609069, "_length": 609069}
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "14", "param": "10v", "_offset": 1218138, "_length": 609069}
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "2", "param": "2t", "_offset": 1827207, "_length": 609069}
{"domain": "g", "date": "20220126", "time": "0000", "expver": "0001", "class": "od", "type": "pf", "stream": "enfo", "step": "0", "levtype": "sfc", "number": "43", "param": "10u", "_offset": 2436276, "_length": 609069}
...

4182 lines of that.

It definitely looks like the information is there. Sounds like we need a kerchunk scan_grib_index function!

martindurant commented 2 years ago

If those "offsets" are what I think they are (will see), then this might indeed be the case that we don't need cfgrib. Otherwise, the JSON looks just a little cryptic.

darothen commented 2 years ago

This is fantastic... I tried this out with two of NOAA's operational datasets archived on GCS and it worked without any modifications, but was much slower (for calibration, the MWE here took ~40 seconds for scan_grib to run, unclear to me if this is a personal internet bandwidth issue or a GCS issue).

HRRR - https://console.cloud.google.com/storage/browser/high-resolution-rapid-refresh/ - took ~5 minutes for scan_grib to run GFS - https://console.cloud.google.com/storage/browser/global-forecast-system/ - took ~7 minutes for scan_grib to run, but there was some odd bug in decoding the longitude coordinate and one of the values wound up as a NaN.

@TomAugspurger I was just about to call out the index files, these should be available for most NWP datasets distributed via GRIB (they are for NOAA). There's a good bit of information on leveraging systems that support reading via random access to subset byteranges corresponding to data fields in the GRIB using the offsets present in the index - see https://nomads.ncep.noaa.gov/txt_descriptions/fast_downloading_grib.shtml

lsterzinger commented 2 years ago

From the docs @TomAugspurger linked: "In addition, the keys _offset and _length represent the byte offset and length of the corresponding field. This allows the download of a single field using the HTTP Byte-Range request" so this definitely seems workable

TomAugspurger commented 2 years ago

Thanks all. I'll spend a bit of time this morning seeing if I can wire something up between the index files and fsspec reference filesystem.

rabernat commented 2 years ago

then this might indeed be the case that we don't need cfgrib

even with the index file, it seems like there must be some metadata in the grib file that is not covered by the index file.

martindurant commented 2 years ago

As far as I can tell, these "fields" will still need decoding. Maybe those other details in each JSON line tells you the dtype and compression.

darothen commented 2 years ago

Tag @blaylockbk - his package Herbie has some nascent capability to do partial decoding of HRRR grib files, could be a starting point here.

Somehow the grib_ls tool that ships with eccodes generates a much more detailed catalog of GRIB file contents, but I'm not sure how it works. That's another avenue to potentially explore.

martindurant commented 2 years ago

If only, instead or providing little tools with arcane CLI options, a good description of how they worked were given. These are all perl?

darothen commented 2 years ago

Not sure, I'm digging for the source code to read through right now.

darothen commented 2 years ago

I spent the last hour digging through the source code for the grib_ls tool... it's a pretty dense C program which directly scans through the GRIB file(s) that are passed to it. Not immediately sure how useful it would be here, but for posterity and in case anyone else wants to dig through it as an educational reference, there are three main source files that comprise it:

Presumably there's enough information in the GRIB files themselves to generate the JSON-style index that @rabernat listed. grib_ls doesn't use the static index file distributed with the GRIB files themselves.

TomAugspurger commented 2 years ago

cfgrib has the concept of a "fieldset", which IIUC is a list of messages, that can be passed to xarray.open_dataset: https://github.com/ecmwf/cfgrib/blob/master/ADVANCED_USAGE.rst

This lets us avoid writing data to disk:

    idx_url = "https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.index"
    grb_url = "https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.grib2"

    r = requests.get(idx_url)
    idxs = [
        json.loads(x) for x in r.text.split("\n") if x
    ]
    idx = idxs[0]

    start_bytes = idx["_offset"]
    end_bytes = start_bytes + idx["_length"] - 1

    headers = dict(Range=f"bytes={start_bytes}-{end_bytes}")

    # partial data I/O required for the message
    r_data = requests.get(grb_url, headers=headers)
    r_data.raise_for_status()
    data = r_data.content
    h = codes_new_from_message(data)
    message = cfgrib.messages.Message(h)
    ds = xr.open_dataset([message], engine="cfgrib")
    print(ds)

That said, we do still need to do the range request to read message to get the structure of the Dataset. So these index files might help with speeding up scan_grib from kerchunk, but I think we'll still need to extract additional information beyond what's available through the URL and index file.

rabernat commented 2 years ago

So perhaps we want to add an optional index_file argument to scan_grib, which would provide a fast path to constructing the references.

TomAugspurger commented 2 years ago

Agreed. And https://github.com/fsspec/kerchunk/blob/ca4acff173e5eac60e51f50c99011b7afde5ee24/kerchunk/grib2.py#L162 could perhaps be updated to avoid the tempfile (which I think is needed when reading right now).

TomAugspurger commented 2 years ago

Here's a little proof of concept for accessing GRIB2 data over the network, using the newly released cfgrib 0.9.10.0. This is just at the Zarr array level. But plugging it into xarray shouldn't be much more work.

import base64
import numcodecs
import eccodes
import xarray as xr
import requests
import cfgrib
import numpy as np
import json

grib_url = "https://ai4edataeuwest.blob.core.windows.net/ecmwf/20220126/00z/0p4-beta/enfo/20220126000000-0h-enfo-ef.grib2"
sub_indices = [{'domain': 'g',
  'date': '20220126',
  'time': '0000',
  'expver': '0001',
  'class': 'od',
  'type': 'cf',
  'stream': 'enfo',
  'step': '0',
  'levtype': 'sfc',
  'param': 'st',
  '_offset': 916472742,
  '_length': 609069}]

class COGRIB2Codec(numcodecs.abc.Codec):
    codec_id = "co-grib2"
    def __init__(self, grib_url):
        self.grib_url = grib_url

    def encode(self, buf):
        raise NotImplementedError

    def decode(self, buf, out=None):
        index = json.loads(buf)
        result = data_from_index(self.grib_url, index)
        return result

def data_from_index(grib_url: str, idx) -> np.ndarray:
    # TODO: think about batching HTTP requests.
    # Unclear how that fits in a regular __getitem__ API.
    # Might be doable with a getitems.
    # See if we can cut down on the number of HTTP requests.
    start_bytes = idx["_offset"]
    end_bytes = start_bytes + idx["_length"] - 1
    headers = dict(Range=f"bytes={start_bytes}-{end_bytes}")

    # TODO: sans-io
    r = requests.get(grib_url, headers=headers)
    r.raise_for_status()
    data = r.content
    h = eccodes.codes_new_from_message(data)
    messages = [cfgrib.messages.Message(h)]

    ds = xr.open_dataset(messages, engine="cfgrib")
    # Get just the data... This is a bit wasteful. See if cfgrib
    # can fetch just the data section of the message.
    return ds[idx["param"]].data

numcodecs.register_codec(COGRIB2Codec)

import zarr
import numcodecs
import numpy as np
filters = [COGRIB2Codec(grib_url)]

x = zarr.empty((451, 900), chunks=(451, 900), dtype=np.dtype("float32"), compressor=None, filters=filters)
# we do this write outside of Zarr
x.store["0.0"] = json.dumps(sub_indices[0]).encode()
x[:]

Which outputs

array([[246.57616, 246.57616, 246.57616, ..., 246.57616, 246.57616,
        246.57616],
       [246.20116, 246.20116, 246.20116, ..., 246.2324 , 246.20116,
        246.20116],
       [245.76366, 245.76366, 245.76366, ..., 245.7949 , 245.7949 ,
        245.76366],
       ...,
       [231.63866, 231.63866, 231.63866, ..., 231.5449 , 231.57616,
        231.6074 ],
       [232.32616, 232.3574 , 232.3574 , ..., 232.2949 , 232.32616,
        232.32616],
       [232.26366, 232.26366, 232.26366, ..., 232.26366, 232.26366,
        232.26366]], dtype=float32)

The Zarr array contains just the metadata (dtype, shape). The "data" is the relevant line from the index file as a bytestring.

Then we have a custom filter that makes the byte-range HTTP request for the data when requested. This is probably misusing Zarr a bit, but it seems to work.

I'll clean this up a bit and then see about where it belongs.

rabernat commented 2 years ago

Tom, great example of Zarr hacking!

I agree you are abusing Zarr filters though. 😬

Here is a related approach I tried for a different a custom binary file format: https://github.com/rabernat/mds2zarr/blob/main/mds2zarr/mds2zarr.py. I created a mutable mapping that "looked like" a kerchunk static json but is actually generating the references on the fly. Then I could use it as an input to ReferenceFileSystem (see usage in test). That is nice because you can avoid hard-coding the actual reading part (calling requests) and leverage the rest of fsspec for that part. So this works equally well with with files, s3, http, etc.

martindurant commented 2 years ago

a mutable mapping that "looked like" a kerchunk static json but is actually generating the references on the fly

This is definitely part of the long-term kerchunk vision - it would work particularly well when chunks map to some file naming scheme.

rabernat commented 2 years ago

This is definitely part of the long-term kerchunk vision - it would work particularly well when chunks map to some file naming scheme.

To clarify, do you imagine including it as part of the kerchunk spec? Because it already works today (as in my example); you just have to document how to write such a class.

martindurant commented 2 years ago

Reading through your code, @TomAugspurger . Do I understand that the main thing here is to use the index definition as an alternative to scanning the file? The "message" I'm pretty sure is the same unit of grib that my code was loading, but with the roundtrip to file. Handling the bytes in memory like this could be done for the existing grib2 module too, I assume.

Yes, I'm sure that an approach like this could be made to work with concurrent loading, perhaps separating the references part (storage) and the decoding (filter).

do you imagine including it as part of the kerchunk spec

It was in my mind to make this explicit, through documentation and/or examples. The mentions in the code and text somewhere of alternative storage formats for the references hints at it.

martindurant commented 2 years ago

Of course, there is no simple way you could encode something like your custom dict into an Intake description (fo would normally be a path string), something extra to think about.

TomAugspurger commented 2 years ago

Thanks for the thoughts on putting this logic in a Mapping vs. a filter. I initially had it as a mapping, but moved away from that since I wasn't sure about

  1. The appropriateness of generating a Zarr store that can only be opened with a specific mapping.
  2. How to compose this "special" mapping with other mappings, if that's even desirable

But filters aren't quite the right place for this either.

Do I understand that the main thing here is to use the index definition as an alternative to scanning the file?

That might be possible, but will take a bit more work. GRIB2 files apparently (can) contain many data variables, which maybe should be grouped somehow. This test file has ~4,000 variables, which cfgrib groups into ~14 xarray datasets. Right now I need to scan the full GRIB2 file to figure out which rows in the index file are included in which dataset. I don't yet know if the rows in the index file have enough information to make the same grouping as cfgrib, or if that metadata is only available after reading.

I do suspect that it's the same byte range as your code. I'd prefer to use the provided ranges from the index file rather than re-discovering it (if the index file is available, which it probably isn't always).

martindurant commented 2 years ago

GRIB2 files apparently (can) contain many data variables, which maybe should be grouped somehow

In usage I have seen from @rsignell-usgs , it seems normal to provide a filter when opening, so you only get messages that can be formed into a consistent dataset. My file scanner does implement this, and it looks like the index contains the information needed.

I'd prefer to use the provided ranges from the index file rather than re-discovering it

Totally makes sense, yes.

martindurant commented 2 years ago

initially had it as a mapping, but moved away from that

I think @rabernat 's example shows nicely that you can separate it out into a reference-getter class (dict) like and filter/decoder, and still use ReferenceFileSystem. Subtleties to iron out, of course.

TomAugspurger commented 2 years ago

I've updated cogrib using with the recommendations from yesterday:

  1. cogrib.make_references now generates kerchunk index files, so it's an alternative to Kerchunk's scan_grib: https://github.com/fsspec/kerchunk/blob/f42b0c23aa0229cee6e5a394a860f0ad1149ace6/kerchunk/grib2.py#L85. The output is read with fsspec's reference filesystem, rather than yet another custom mapper.
  2. cogrib implements a Zarr filter to turn the raw bytes in memory from the GRIB2 file into an ndarray (https://github.com/TomAugspurger/cogrib/blob/a56d209d9162b276a31b61c4ae448ac9cd129cc5/cogrib.py#L187-L204). This is an alternative to the filter in https://github.com/fsspec/kerchunk/blob/main/kerchunk/grib2.py#L162, which writes to disk first (minor benefit).

Some notes on performance:

  1. Reading the whole file from disk with cfgrib to determine the datasets is slow (6-7 minutes for this test file)
  2. Generating the references is really fast (less than 500 ms total for all 14 datasets in this case)
  3. Reading the zarr metadata with xarray is really fast (less than 200 ms total for all 14 datasets)
  4. Read performance looks OK. It's about 200ms to read each chunk, which in this case is a 1.55 Mb 2-d array. Datasets will have some number of these chunks equal to the number of data variables * number of ensemble members * number of levels. Anywhere from 1 to 3600 chunks. When there's more than one chunk, we average about 32 MiBs/second (or 8 MiBs/second/core; that should scale roughly linearly with the number of cores, up until we saturate the bandwidth of each node).

So lots of overlap with what's in kerchunk today, but at least I understand GRIB2 a bit better now. I'll look into how we could reconcile the two approaches. Updating kerchunk's numcodecs filter should be easy. Harmonizing scan_grib might be a bit harder since they have different logic in how datasets are extracted /combined from a GRIB2 file (cogrib just uses whatever cfgrib does). But first I want to try this out on some other ECMWF files and then some NOAA files to make sure it generalizes.