fsspec / GSoC-kechunk-2022

MIT License
2 stars 2 forks source link

ERA5 example: loads files with different numbers of chunks! #5

Closed rsignell-usgs closed 2 years ago

rsignell-usgs commented 2 years ago

https://nbviewer.org/gist/136480d389e948b0756ee2f7c5ee9564

This would be another good one for the kerchunk examples.

rsignell-usgs commented 2 years ago

@peterm790 , this is one where there would be huge value in extending this workflow, creating a json that represents the entire dataset.

If you did that, it would not only be a great contribution to the community but a great blog post! :)

When there are thousands of files, it's certainly a good idea to use a dask cluster to process the files in parallel, and when there are more than 10,000 or so, it's good to use dask bag to partition the jobs into batches.

A good example of using Dask Bag to create the kerchunk jsons is this notebook that @lsterzinger wrote to process the entire US National Water Model (NWM) reanalysis (200,000+ netcdf files!)

https://nbviewer.org/gist/8cb0b61b6805ca5f00db2236ef82d3d8

In this notebook we also show how to store the individual JSON files on S3. If we have them on S3, we can easily add more as new data arrives (such as with ERA5) and then rerun only the MultiZarrtoZarr to update the entire forecast collection.

Does this make sense?

peterm790 commented 2 years ago

Hi Rich, yes makes sense. I will have a go at replicating this on a small subset to check the old notebook is still running fine then will move on to creating a combined json for the whole dataset

peterm790 commented 2 years ago

Yes still works fine - https://nbviewer.org/gist/peterm790/1878cbe3ea3b63daa84f01435b03297a

The data has a deprecation warning but is still up to date (was updated yesterday)

martindurant commented 2 years ago

The data has a deprecation warning

I was wondering what you meant by this; is it text on the page describing the data?

peterm790 commented 2 years ago

yes on the data info page: https://registry.opendata.aws/ecmwf-era5/

martindurant commented 2 years ago

OK. Well, I'm not too worried about it, but would annoying if they stopped updating.

peterm790 commented 2 years ago

I have set up a script to process the full ERA5 data set and all looks set to run. However there is a small issue in the way the coordinates are mapped to the virtual dataset. Where any 0 coordinate is changed to Nan. here is a notebook showing this: https://gist.github.com/peterm790/d2ee7b50127c89bfc7df51a9bba2f771

And this is the notebook I used to generate the kerchunk reference files (I have only precessed 1979 for now): https://gist.github.com/peterm790/8484d83b6e8ffc2073c06012d1dde9c3

I seem to remember coming across an issue similar to this when reading through the gitter chat history but I can't seem to find it anymore.

This is using kerchunk version '0.0.6'

martindurant commented 2 years ago

This is a zarr thing: we need to tell it that there is no special "missing value" in the dataset, as 0 is assumed for integer types. Depending on whether the NaNs show up in the individual JSONs or only the final, you can add a preprocess or postprocess function to MultiZarrToZarr to edit the .zarray file of the array with the problem.

peterm790 commented 2 years ago

Hi, I am still struggling a little bit with the interpretation of the fill values.

My understanding is that the default zarr fill_value, which is stored in the ".zarray" key, is 0. While the _fill_value, in ".zattrs", is from the original, in this case netcdf, file. When this is opened by xarray I am struggling to work out whether both fill_value and _fill_value are considered or whether if _fill_value is present in the attributes it will supersede zarr's fill_value when masking and scaling. Essentially which one is best to change in this case.

I have of course tried to manually test this but I am having some trouble with how to manipulate the kerchunk file using post_process. I have had some success manually editing the fill_value in the final dictionary using some regex:

  def postprocess(out):
      out['lat/.zarray'] = out['lat/.zarray'][:143]+'-9999' + out['lat/.zarray'][146:]
      out['lon/.zarray'] = out['lon/.zarray'][:143]+'-9999' + out['lon/.zarray'][146:]
      return out

This seems to work fine but feels like a bit of a hack, Instead:

  def postprocess(out):
      out_ = zarr.open(out)
      out_.lon.fill_value = -9999
      out_.lat.fill_value = -9999
      return out

Seems like a better solution, however this returns a TypeError: reject_bytes is on ..... and is bytes I have done my best to try interpret how to solve this; such as passing out to ujson.dumps , converting to stror passing back to the final part of the consolidate routine of MultiZarrToZarr

The previous examples I have seen using the post-processing technique have been with the older spec manipulating xarray datasets. Am I heading in the correct direction here or am I missing something obvious? :see_no_evil:

peterm790 commented 2 years ago

I think my understanding of what exactly the out which post-process operates on represents is still a bit lacking but just as an update the following works:

  import base64

  def consolidate(out_):
      for k, v in out_.items():
          if isinstance(v, bytes):
              try:
                  # easiest way to test if data is ascii
                  out_[k] = v.decode('ascii')
              except UnicodeDecodeError:
                  out_[k] = (b"base64:" + base64.b64encode(v)).decode()
          else:
              out_[k] = v
      return out_

  import zarr

  def modify_fill_value(out):
      out_ = zarr.open(out)
      out_.lon.fill_value = -999
      out_.lat.fill_value = -999
      return out

  def postprocess(out):
      out = modify_fill_value(out)
      out = consolidate(out)
      return out

in context of the full workflow: https://gist.github.com/peterm790/4ec83c976541ec40223efad6e3871561

peterm790 commented 2 years ago

Just to complete the Fill Value discussion above, the following works to update fill value in .zattrs and .zarray:

  import base64

  def consolidate(out_):
      for k, v in out_.items():
          if isinstance(v, bytes):
              try:
                  # easiest way to test if data is ascii
                  out_[k] = v.decode('ascii')
              except UnicodeDecodeError:
                  out_[k] = (b"base64:" + base64.b64encode(v)).decode()
          else:
              out_[k] = v
      return out_

  import zarr

  def modify_fill_value(out):
      out_ = zarr.open(out)
      out_.lon.fill_value = 9.9692099683868690e+36
      out_.lat.fill_value = 9.9692099683868690e+36
      out_.lat.attrs['_Fill_Value'] = 9.9692099683868690e+36
      out_.lon.attrs['_Fill_Value'] = 9.9692099683868690e+36
      return out

  def postprocess(out):
      out = modify_fill_value(out)
      out = consolidate(out)
      return out

The fill value used is following the recommended default fill value from netcdf CF conventions, which seems to be the method xarray uses when writing to zarr with no specified fill value.

Would it be worth building these default fill values into kerchunk.hdf.SingleHdf5ToZarr as I am sure the default 0 fill value will be a recurring problem?

And also should postprocess perhaps be called at the start of consolidate in kerchunk.combine.MultiZarrToZarr to avoid needing to pass the postprocess output to consolidate again in the above example?

I can open either of these as issues in the kerchunk repo if they are worth looking into further

martindurant commented 2 years ago

Would it be worth building these default fill values into kerchunk.hdf.SingleHdf5ToZarr as I am sure the default 0 fill value will be a recurring problem?

Yes, we certainly need a way to deal with this. I don't immediately see the need or justification for the value 9.9692099683868690e+36 , but there could be a set of options, and certainly a test like this one, to make sure that zero values never accidentally become NaNs.

And also should postprocess perhaps be called at the start of consolidate in kerchunk.combine.MultiZarrToZarr to avoid needing to pass the postprocess output to consolidate again in the above example?

That very likely makes sense, yes. There's also an argument for making consolidate a function rather than a method, so that it can be called explicitly by the user, and tested separately.

peterm790 commented 2 years ago

Just to update this, I have done some trial and error and the step of creating the interim jsons, explained in https://github.com/fsspec/kerchunk/issues/182, is still performing slower than computing directly from the individual jsons, which now runs in 18 mins, significantly faster than 3 hours before https://github.com/fsspec/kerchunk/pull/183 was merged. I have not yet tried increasing the inline_threshold on the interim combine which I think will speed up the interim step.

Here is the current workflow for a single variable

https://gist.github.com/peterm790/66fa117d521182d26c29fc12e51361e5

Moving forwards my intention is to extend the workflow to create kerchunk sidecar files for the other available variables and combine these into one complete dataset. Before setting up a separate workflow to update this dataset when new monthly data is added

peterm790 commented 2 years ago

Hi @rsignell-usgs @martindurant,

I have created two tutorials using this ERA5 dataset the first is just a simple demonstration of opening a sidecar file and how to create the sidecar for a single variable: https://nbviewer.org/github/peterm790/ERA5_Kerchunk_tutorial/blob/master/ERA5_tutorial.ipynb

and the second is intended to introduce as much of the functionality of MultiZarrtoZarr as possible and can perhaps be used to update the docs: https://nbviewer.org/github/peterm790/ERA5_Kerchunk_tutorial/blob/master/ERA5_tutorial_extended.ipynb

I created a repository to store the notebooks as well as the ERA5 sidecar file (~14MB) https://github.com/peterm790/ERA5_Kerchunk_tutorial/

rsignell-usgs commented 2 years ago

I'm closing this issue since we can now raise issues about ERA5 on your ERA5 tutorial repo