JackKelly / hypergrib

Lazily read petabytes of GRIBs from cloud object storage.
MIT License
8 stars 0 forks source link

Ideas for loading GRIB data as fast as possible #2

Open JackKelly opened 1 month ago

JackKelly commented 1 month ago

Are we confident that it's even possible to go faster than kerchunk when reading a petabyte-scale GRIB dataset on cloud object storage?! (If not, then there's not much point in hypergrib existing!)

In particular, when reading GRIB files, do we think kerchunk would saturate a 200 Gbps network connection on a VM connected in the same cloud region as the GRIB data? (Saturating a 200 Gbps NIC probably requires a few hundred GET requests to be in flight at any moment). My understanding is that Zarr-Python version 2 (without David's joblib patch to Zarr) definitely wouldn't saturate a 200 Gbps NIC. But maybe Kerchunk combined with Zarr-Python version 3, and/or Zarr-Python v2 with David's patch, would saturate a 200 Gbps NIC?

And, in terms of latency, how long would it take kerchunk to figure out which GRIB to read, if kerchunk has to look through a huge manifest (let's say 20 years of GRIBs)?

martindurant commented 1 month ago

One thing I've been worrying about: Are we confident that it's even possible to go faster than kerchunk when reading a petabyte-scale GRIB dataset on cloud object storage?! (If not, then there's not much point in hypergrib existing!)

Having experimental POCs turn into useful production code is the path I am all about. We won’t know until we try.

In particular, when reading GRIB files, do we think kerchunk would saturate a 200 Gbps network connection on a VM connected in the same cloud region as the GRIB data? (Saturating a 200 Gbps NIC probably requires a few hundred GET requests to be in flight at any moment). My understanding is that Zarr-Python version 2 (without David's joblib patch to Zarr) definitely doesn't perform very well. But maybe Kerchunk combined with Zarr-Python version 3, and/or Zarr-Python v2 with David's patch, would saturate a 200 Gbps NIC?

Well, I think the following points are certainly tractable (referencing @mpiannucci’s replies and our earlier conversations)

And, in terms of latency, how long would it take kerchunk to figure out which GRIB to read, if kerchunk has to look through a huge manifest (let's say 20 years of GRIBs)?

This is fast! Parquet files can easily store millions of references, each partition loading fast and not taking up too much memory so that you can keep many in memory. Of course, some tuning could be done and there are alternative storage mechanism like DB or even Redis, but I think very likely parquet is good enough. Currently we store references in the same order as data (C-ordering), and this could be changed or configured if it becomes a problem.

mpiannucci commented 1 month ago

zarr 3 bringing async combined reads across multiple variables; I don’t think there’s any plan for this, but it’s possible to reduce the number of reads, depending on whether latency or throughput is the killer

It would be great to get a benchmark that proves latency (IO) or throughput (CPU) is the limiter and we can optimize from there

seeing if we can persuade gribberish's rust implementation to do true multithreading for decode; probably this is a "yes". eccode cannot, as it’s known to have global state that’s not thread safe.

This is possible, there is no global state in gribberish.

Of course, some tuning could be done and there are alternative storage mechanism like DB or even Redis, but I think very likely parquet is good enough

I have a serverless weather app using dynamo db thats stores my references and its very fast to get what you need on demand.

All in all, I think the biggest killer to perf is that grib is always 1 time chunk with possibly large spatial dimensions. If you want to extract a timeseries it will always be slow, you can speed it up through threading or async but its always going to have to unpack more data than it needs to if the data is complex coded. Though that should be very easy for me to confirm

martindurant commented 1 month ago

I think the biggest killer to perf is that grib is always 1 time chunk with possibly large spatial dimensions.

Since we can request >>1000 items concurrently, latency ought not to be too big a deal in most cases. However, throughput of bytes and memory (storing/decoding those bytes) will be, unless we can split the "large" dimension on at least one axis. This is the basis of my pushing for partial reads of COMPLEX. Data description shows that the parameters are written up front (where is Code Table 5.4??). Are the same parameters used consistently?

The essence of the complex packing method is to subdivide a field of values into NG groups, where the values in each group have similar sizes. In this procedure, it is necessary to retain enough information to recover the group lengths upon decoding. The NG group lengths for any given field can be described by Ln = ref + Kn * len_inc, n = 1,NG, where ref is given by octets 38-41 and len_inc by octet 42. The NG values of K (the scaled group lengths) are stored in the Data Section, each with the number of bits specified by octet 47. Since the last group is a special case which may not be able to be specified by this relationship, the length of the last group is stored in octets 43-46.

So is it a unique vector for each message of a given type, or always the same (e.g., one group per line)?

mpiannucci commented 1 month ago

However, throughput of bytes and memory (storing/decoding those bytes) will be

Agree this is probably the case.

So is it a unique vector for each message of a given type, or always the same (e.g., one group per line)?

I think to test this, we can grab say 5 random GFS or HRRR data files, grab 2 meter temperature for each grib file, and then just dump the data representation section. I may need to add a way to serialize a given section or template to gribberish core but that should be easy enough

JackKelly commented 1 month ago

This is a great discussion!

On the topic of decoding in parallel... I'm guessing that the low-hanging-fruit is that gribberish stays single-threaded. But hypergrib can request, say, 1000 grib messages in parallel, and decompress them in parallel by calling gribberish from multiple threads. (Rather than trying to decompress a single GRIB message across multiple threads). Does that sound sane? (Or maybe that's what you guys were already suggesting?! :slightly_smiling_face:)

And, yes, I fully agree, @mpiannucci: If the user wants a timeseries for a single geographical point then they're going to have a bad time if the data is in GRIB! But, a bunch of ML use-cases do need 2D images per timestep. And my hope is that we can support those better.

(BTW, I should also flag up that I won't get much coding done over the next few weeks... kids on holiday etc.!)

martindurant commented 1 month ago

hypergrib can request, say, 1000 grib messages in parallel, and decompress them in parallel by calling gribberish from multiple threads.

Yes, I expect that gribberish releases the GIL and allows this; but also see @emfdavid 's notes on moving the parallel decoding into the inner loop of zarr.

JackKelly commented 1 month ago

Sounds good!

For my first "throw-away prototype" i was (cheekily) thinking of implementing everything in Rust. And having a Python API which plugs directly into xarray (skipping Zarr). Just so I have full control over the parallelism. But maybe that's a bad idea?! I haven't looked into what's required to plug directly into xarray.

martindurant commented 1 month ago

implementing everything in Rust

I don't have a strong opinion here. In python you'll have more code help, so it depends on your comfort level. I would say that interfacing between asyncio (zarr 3 or in fsspec) and tokio, while possible, is probably tricky (especially in multiple threads).

mpiannucci commented 1 month ago

I already have an app written that does this in rust using AWS lambda so I'd be happy to reuse some of that code to just make a local example if you'd like.

I can also use the service that runs on lambda to compare fetching multiple chunks vs just one because that's kinda how it operates anyways.

JackKelly commented 1 month ago

I'd be happy to reuse some of that code to just make a local example if you'd like

Ooh, that could be great! (But please don't worry if it's any effort. TBH, I'm also quite eager to force myself to learn more about GRIB by implementing some code myself. But, of course, it'd be great to learn from your code, too!)

I can also use the service that runs on lambda to compare fetching multiple chunks vs just one because that's kinda how it operates anyways.

That would be great, too! It'd be interesting to see whether it can saturate the network interface card. (Although maybe it's not easy to find out the NIC speed of a lambda instance?)

JackKelly commented 1 month ago

Hi @emfdavid. This slide is in your excellent ESIG presentation:

image

Please may I ask three quick questions about this slide:

  1. Just to confirm: Is the y-axis in the lower plot giga-bytes per second? (Not giga-bits per second?) (I assume the uppercase "B" means "bytes" but I thought I should check!)
  2. Do you know the speed of the network interface card on the VM that was running your code? How close is 2GB/s to saturating the NIC?
  3. Please remind me: Does the lower plot show the total network bandwidth to a single VM?
JackKelly commented 1 month ago

Also, when you say

The parallel processing overhead is much higher for the short prediction tasks. Observed read rates are typically 50-100 Mb/second.

Is the slowdown due to extracting a timeseries "churro" from the GRIB dataset? (i.e. needing to extract a timeseries for a single geographical point)?

emfdavid commented 1 month ago
  1. Just to confirm: Is the y-axis in the lower plot giga-bytes per second? (Not giga-bits per second?) (I assume the uppercase "B" means "bytes" but I thought I should check!)
Screenshot 2024-08-02 at 9 01 54 PM

The GCP metrics units are a bit opaque via the grafana api but the metric name does specify bytes.

  1. Do you know the speed of the network interface card on the VM that was running your code? How close is 2GB/s to saturating the NIC?

Very difficult to pin down with a kube autopilot cluster. I think we are most of the way there, but we should definitely start with GCE instance, not kube for benchmarking.

  1. Please remind me: Does the lower plot show the total network bandwidth to a single VM?

That is showing total bytes received for a kube pod.

emfdavid commented 1 month ago

Is the slowdown due to extracting a timeseries "churro" from the GRIB dataset? (i.e. needing to extract a timeseries for a single geographical point)?

I am extracting a block - something like 50 x 100 by time steps since I am running the forecast for a group of ~2000 meters in one geographic area. I think the slow down is due to the number of time steps being small, so the overhead associated with the way I implemented the parallelization is pretty high. Joblib's lokey backend is pretty expensive starting the parallel processes - it just doesn't work very well for short time series. It is still a big improvement over what I was getting before, but I was pretty focused on the training use case where I want ~40k time steps rather than ~36 for the forecast.