Open martindurant opened 4 months ago
Just in case you didnt notice, gribberish python does have kerchunk support https://github.com/mpiannucci/gribberish/tree/main/python/gribberish/kerchunk
Do you think there's a way, from the code here, to find the start/size of only the payload of each message, so that at read time we can grab fewer bytes and do less work making the output array? What information would need to be available at load time?
Its a bit nuanced because there are so many different data templates that data can come in. If we think through this a bit, each grib message is composed of sections. Each section starts with the section length and the section number. So we do indeed have access to the start byte of the data section.
The tricky part comes when we want to parse the data. To generically read the data array from a given grib message, you need access to the DataRepresentationSection, the BitMap Section, and the Data Section. In any given grib message this is sections 5 through 7. There are some data packages that do not use the bitmap section, but in those cases that section is like 8 octets long so it has like no impact really.
So the next question is, do we need the Data Representation Section? And my answer is i'm not totally sure. The data representation section encodes the Data Representation Template which describes how to transform the data from bytes to array. I am not totally clear if these values are shared across variables, timesteps, etc and we would have to check.
So at a minimum, the answer to this question is this:
The codec would need to be updated to support these
Furthermore, do you think it might be possible to chop such a message buffer into equal sections (along the slowest-varying/outer array dimension), such that each of those sections could be read independently?
I am not sure I understand this question totally. If you mean can we split a data stream into multiple chunks, the answer is that depends on the Data Representation template, if its JPEG (GFS Wave) its not possible, or Complex (HRRR, GFS, GEFS) I don't think its possible either, which would limit its utility because those are some of the largest message streams. Furthermore, you would still (probabaly) need the Data Representation Section (unless i'm proven otherwise through investigation) so any benefits to sub chunking would be negligible.
Im interested in benefits this could bring so happy to continue this discussion
gribberish python does have kerchunk support
I'm not sure I've used that :|. Is there any comparison of speed or capabilities? Or is the main difference more simply that referenced produced here will also use gribberish to decode rather than eccodes?
Here is a pre-release blog post by @rsignell demonstrating splitting of native chunks for the sake of better random access and smaller byte ranges: https://medium.com/@rsignell_93546/9008ba6d0d67 . That case is easy, because the netCDF3 data is uncompressed, but looking at simple unpack, it seems like one could do the same here?
do we need the Data Representation Section
As far as I understand the Simple case: I don't think so, these are effectively parameters of the Codec and while they might vary from one message to the next, for each particular message type (level, etc) of a series of files, they will be the same. Whether or not there is a bitmap will also be predetermined. I would reckon, that for encodings 0, 1, 2, 4 you already have the necessary code to take only 1/N equal parts of a buffer. That covers a large fraction of use cases.
Question @emfdavid : do you happen to know the size of the biggest message you encounter, and an estimate of what fraction of that is useful bytes in one of your typical workflows?
The largest length in my BQ table for the HRRR Surface files is 3565237 bytes. With a grid size of 1059*1799 that is only 1.87 bytes per value and the lengths are not consistent each message seems to have a different length... but I have not done an in depth analysis. Here is a csv export you can puzzle over - I subset by step equals one hour here...
SELECT * FROM `camus-infra.nodd_index_metadata.nodd-hrrr-conus-sfcf-v2-0_BL` where step=3600000
gribberish python does have kerchunk support
I'm not sure I've used that :|. Is there any comparison of speed or capabilities? Or is the main difference more simply that referenced produced here will also use gribberish to decode rather than eccodes?
On the xarray front, currently it is much faster to read in the dataset, and when kerchunking it has in my opinion a better API for choosing which messages you actually care about.
As for read speed, here is a simple benchmark for Complex, PNG, and JPEG compressed data, reading from python between gribberish and eccodes:
gribberish is faster in many cases, and at worst more stable than eccodes.
That case is easy, because the netCDF3 data is uncompressed, but looking at simple unpack, it seems like one could do the same here?
Correct it could, the issue is that none of the most interesting data uses simple packing so its not totally useful.
As far as I understand the Simple case: I don't think so, these are effectively parameters of the Codec and while they might vary from one message to the next, for each particular message type (level, etc) of a series of files, they will be the same. Whether or not there is a bitmap will also be predetermined. I would reckon, that for encodings 0, 1, 2, 4 you already have the necessary code to take only 1/N equal parts of a buffer. That covers a large fraction of use cases.
I dont think this is true. Each message is compressed using Complex Packing and Spatial Differencing for HRRR, GFS, GEFS data, which are some of the only datasets that are big enough that it would make sense to split a given message into multiple chunks. GFS, GEFS Wave data are JPEG2000 packed which can also not be subchunked AFAIK.
With a grid size of 1059*1799 that is only 1.87 bytes per value and the lengths are not consistent each message seems to have a different length... but I have not done an in depth analysis.
Yes correct, each variable and possibly each timestep is compressed individually so you cannot assume packing length for non-simple packed messages
GFS, GEFS Wave data are JPEG2000 packed which can also not be subchunked AFAIK.
agreed, I don't think j2k can be split. At least I don't know how, but given than zlib/deflate can be, maybe it's possible with substantial effort.
An idea suggested by @gatesn is that, in theory, we could enable random access into any compressed file by storing the internal state of the decompressor at regular intervals (say, every 4 kB). This would require someone to read through the entire dataset to create a file storing the decompressor's internal state every 4 kB. And it would require code modifications deep inside the guts of the codec! So it probably would be a lot of work. But might be kind of fun...
Actually, looking at the docs for openjpeg (the reference jpeg2000 implementation)... it looks like jp2k uses concepts which might be similar to "chunks". jp2k uses "tiles", "precincts", and "code-block size" (default 64x64). And openjpeg can output an index file "that summarizes the codestream. This index is useful to create a navigation process."
So maybe jp2k is kind of chunked already?! (But I'm way out of my comfort zone here!)
It might be! Dealing with the openjpeg raw bindings makes this more annoying to solve though haha
I think the Complex and Spatial Differencing data representation is far more ubiquitous and more impactful to figure out. It may be possible if the differencing only happens within single rows? Ill have to relearn the compression, i implemented it over a year ago now.
Will subchunking the spatial domain really help much for grib2 workflows?
You can split the spatial domain, but you are still stuck with 1 time value in each chunk, right?
And if the entire-spatial-domain chunk is only 25MB or something, splitting it further wouldn't be that much of a benefit, would it?
You can split the spatial domain, but you are still stuck with 1 time value in each chunk, right? And if the entire-spatial-domain chunk is only 25MB or something, splitting it further wouldn't be that much of a benefit, would it?
Correct, my guess is that its probably not worth it because it will pretty much always be 1 time step per chunk, and even the largest grib chunks arent that big in the first place, plus the grib metadata isnt significant IMO
But i dont do ML at the scale of Jack and David
Can we do a comparison of a bunch of gribs in a series, to see if the data representation section has the same values for a particular message in each file?
@emfdavid : the throughput you got with your parallel tricks in zarr, were they using gribberish as the decoder?
@martindurant Gribberish was not complete when I started moving code into production in February. I have not had time to go back and try it for comparison. I think we can just switch the codec in the refspec. I can give it a shot for an apples to apples comparison.
if the entire-spatial-domain chunk is only 25MB or something, splitting it further wouldn't be that much of a benefit, would it?
It might help when you only want a single pixel in the spatial domain, and many timesteps? e.g. you want a multi-year time series for a single geographical point?
@JackKelly Maybe, but I would like to be convinced! :)
We could test this idea if we already had kerchunked grib data from two different models with different spatial dimension sizes. For example, we could try reading 1000 time steps from HRRR (15MB chunks) and GFS 1/4 degree resolution (4MB chunks) with the same cluster and see how much faster GFS is.
Does someone have kerchunked datasets for HRRR and GFS?
Hehe :) Yeah, I totally agree that it'd be good to get empirical evidence.
To give a tiny bit of context: I'm interested in training large machine learning models directly from huge GRIB datasets on cloud object storage. In the ideal case, I'd like to sustain several gigabytes per second to each GPU. And we'd often want to read just a few spatial locations, across many years. And I'd like to do a bunch of numerical processing on-the-fly (like normalising each value). So reducing the time to open a single GRIB message from, say, 5 ms to 2.5 ms really matters to me :slightly_smiling_face:
Here is a Gist with two years for HRRR and GFS (6 and 67-72 hour horizon) with two variables dswrf and t2m. Looks like a couple forecast runs were missing for GFS? You will probably need to handle exceptions in the Codec where some of the NODD grib files are incomplete... Camus uses the GCS... but you should be able to rename the files to S3 if need be. Open it with xarray DataTree as in the screen shot.
Okay! Armed with @emfdavid's refs for both HRRR (14.54MB chunks) and GFS (7.92MB chunks), I tried reading 5000 chunks from each using a Coiled-generated Dask cluster of 50 workers (100 total threads).
The results:
So even though the HRRR chunks are 83% bigger, they took only 20% longer to read.
Here's the gist of the notebook I used.
I created the coiled cluster environment with:
coiled env create --name esip-pangeo-arm --workspace esip-lab --conda pangeo_env.yml --architecture aarch64
where pangeo_env.yml
is:
channels:
- conda-forge
dependencies:
- python=3.11
- adlfs
- cf_xarray
- cfunits
- coiled
- curl
- dask>=2024.7.0
- python-eccodes
- eccodes
- fastparquet
- fsspec>=2024.2.0
- gdal
- gcsfs
- h5netcdf
- h5py
- intake>=2.0
- intake-stac
- intake-xarray
- intake-parquet
- ipywidgets
- ipykernel
- kerchunk
- metpy
- netcdf4
- numba
- numcodecs
- numpy<2
- pandas
- planetary-computer
- pyepsg
- pystac
- pystac-client
- python-snappy
- rechunker
- s3fs
- ujson
- vim
- zstandard
- xarray-datatree
- zarr
@rsignell Mic Drop 😆
(let;s be sure to include those in intake catalogs, probably on public-data)
Thanks for doing the test! That was speedy!
So even though the HRRR chunks are 83% bigger, they took only 20% longer to read.
That's surprising. Do we have any clues as to why that is? What's dominating the runtime, if it's not IO?
the HRRR chunks are 83% bigger
This is the in memory size, no? So the compression ratios might be different.
the HRRR chunks are 83% bigger
This is the in memory size, no? So the compression ratios might be different.
Right I believe the compression ratios are different for every message, no matter the model
Otherwise I don't think spatial difference works.
As I understand complex packing:
Since the bit-widths of each group are data dependent, the data size will vary for each group even though they have the same number of values. Each instance of a message at each time-point therefore will have a different data size, but probably not by much! However, each message type can have very different bits-per-value.
Based on logs from my latests training runs I get the following for a single container running in a Kube cluster on 16CPU pod using joblib for parallelism.
HRRR
[Parallel(n_jobs=32)]: Done 5068 tasks | elapsed: 1.9min
....
[Parallel(n_jobs=32)]: Done 31307 out of 31307 | elapsed: 12.3min finished
GFS
[Parallel(n_jobs=32)]: Done 5068 tasks | elapsed: 1.6min
...
[Parallel(n_jobs=32)]: Done 26867 out of 26867 | elapsed: 8.7min finished
Each task is one slice of one variable from a grib file.
I was feeling pretty depressed about ^ but if you run the numbers in CPU seconds, I am not doing too badly.
Rich had 13.9s on 100 cpu -> 1390 cpuseconds I have 1.9m on 16 cpu -> 1824 cpuseconds
I am very impressed with the parallel efficiency in coiled/dask! I should be paying less overhead using joblib in cpu-seconds. I wonder where the waste is going? There is a significant startup cost which I think we have skipped in both of these experiments. I am pulling a 4x4 slice from each grib for use with scipy interp.
the HRRR chunks are 83% bigger
This is the in memory size, no? So the compression ratios might be different.
Right I believe the compression ratios are different for every message, no matter the model
Otherwise I don't think spatial difference works.
I would expect the compression ratio would be similar for these two models though -- they are both large scale atmospheric models and we are looking at the same variable.
I guess it wouldn't be that hard to check -- we just need to look at the file sizes in object storage!
Perhaps the grouping size or even difference order is an input parameter (I don't know). I could imagine choosing different group sizes might greatly affect the compression ratio.
From a conversation with @emfdavid and @JackKelly
As you I think know, kerchunk currently works with grib files by finding the start/size bytes of each message, and at load time, passing this blob of bytes to eccodes to make a whole in memory grib message; the output of the process is the numerical array it represents. That means that all of the various attributes and coordinate definitions are interpreted each time, even though this information is already known from the initial scan and stored in the reference set.
Do you think there's a way, from the code here, to find the start/size of only the payload of each message, so that at read time we can grab fewer bytes and do less work making the output array? What information would need to be available at load time?
Furthermore, do you think it might be possible to chop such a message buffer into equal sections (along the slowest-varying/outer array dimension), such that each of those sections could be read independently?