fsspec / GSoC-kechunk-2022

MIT License
2 stars 2 forks source link

IOOS Success Story with LiveOcean forecast collection #6

Open rsignell-usgs opened 2 years ago

rsignell-usgs commented 2 years ago

This GSoC was chosen by the IOOS organization, so I just wanted to report that @peterm790's help with understanding a problem with the fill_value in led to a Kerchunk PR which was then successfully applied to an IOOS model collection -- the LiveOcean forecast collection from the NaNOOS Regional Association of IOOS.

We took 24 sample hourly LiveOcean NetCDF from ROMS, put them on the Open Storage Network and kerchunked them into a single virtual Zarr dataset. We were able to modify the metadata, adding the standard_name='time' attribute to the ocean_time variable.

Here is a notebook demonstrating access and simple visualization:

Snapshot:

2022-06-22_15-13-30

rsignell-usgs commented 2 years ago

@peterm790 and I had a good discussion about LiveOcean yesterday and he set up a kbatch cronjob script to kerchunk the daily forecast data at 1500 UTC, about 1 hour after it arrives on Azure Blog Storage.

It worked yesterday, fingers crossed it works today!

He's also working on a "Live Ocean" explorer notebook that consumes the data, starting with this Panel-based COAWST Explorer notebook. In this notebook we standardize the dataset in the notebook (e.g. adding standard_name for time, and fixing fill values, but it would be better to fix those in the JSON of the virtual dataset!)

peterm790 commented 2 years ago

Just to update this I have set up a new container image at https://github.com/users/peterm790/packages/container/package/pangeo which replicates the pangeo conda enviroment on qhub, so we no longer rely on the OGC demo image. When this is finished updating I will set up a more minimal image for running the liveocean kerchunk update script.

Then the liveocean source data on azure seems to have changed slightly and been extended to a 3 day forecast, so I will need to change the updating script to reflect this.

rsignell-usgs commented 2 years ago

I tried running my LiveOcean Explorer notebook today (which is accessing a kerchunk dataset that is updated daily) and got:

File /home/conda/users/e6aa7ecb5d1fdfdc96b013e60f37b5b6476dfafe607fc361e6c796f9050d1e70-20220801-103922-189931-116-pangeo/lib/python3.9/site-packages/numcodecs/zlib.py:40, in Zlib.decode(self, buf, out)
     37     out = ensure_contiguous_ndarray(out)
     39 # do decompression
---> 40 dec = _zlib.decompress(buf)
     42 # handle destination - Python standard library zlib module does not
     43 # support direct decompression into buffer, so we have to copy into
     44 # out if given
     45 return ndarray_copy(dec, out)

error: Error -3 while decompressing data: incorrect header check

@martindurant do you know where to point the finger?

martindurant commented 2 years ago

Network issue?!?

rsignell-usgs commented 2 years ago

Network issue when creating the JSON (earlier today) or accessing the JSON/NetCDF files (in which case I should just try again later?)

martindurant commented 2 years ago

I imagine the latter, unless you gzip-compressed your JSON.

martindurant commented 2 years ago

The file was created on the 1st but modified on the 3rd - does it need to be rescanned?

{'metadata': None,
 'creation_time': datetime.datetime(2022, 8, 1, 13, 47, 41, tzinfo=datetime.timezone.utc),
 'deleted': None,
 'deleted_time': None,
 'last_modified': datetime.datetime(2022, 8, 3, 13, 39, 46, tzinfo=datetime.timezone.utc),
 'content_settings': {'content_type': 'application/octet-stream', 'content_encoding': None, 'content_language': None, 'content_md5': None, 'content_disposition': None, 'cache_control': None},
 'remaining_retention_days': None,
 'archive_status': None,
 'last_accessed_on': None,
 'etag': '0x8DA7555A1264549',
 'tags': None,
 'tag_count': None,
 'name': 'cas6-v0-u0kb/f2022-08-03/ocean-his-0022.nc',
 'size': 644660145,
 'type': 'file'}
rsignell-usgs commented 2 years ago

Hmm, this works:

ds.temp[383,-1,0,150]

but this doesn't, giving the incorrect header check error:

ds.temp[384,-1,0,150]

Would looking at the referenceFileSystem object provide insight?

Is there a good way to get the referenceFileSystem object from the intake catalog?

The intake catalog looks like:

cat['LiveOcean-Archive']

LiveOcean-Archive:
  args:
    consolidated: false
    storage_options:
      fo: s3://esip-qhub-public/LiveOcean/LiveOcean_reference.json
      remote_options:
        account_name: pm2
        skip_instance_cache: true
      remote_protocol: abfs
      target_options:
        anon: true
        skip_instance_cache: true
    urlpath: reference://
  description: LiveOcean Forecast Archive
  driver: intake_xarray.xzarr.ZarrSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/USGS-python/hytest-catalogs/main
rsignell-usgs commented 2 years ago

@martindurant oops I didn't see your message above. Will take a look.

martindurant commented 2 years ago

The way to get the filesystem:

fs = fsspec.filesystem("reference",
    "fo": "s3://esip-qhub-public/LiveOcean/LiveOcean_reference.json",
    "remote_options": {"account_name": "pm2",
          "skip_instance_cache": True},
    "remote_protocol": "abfs",
    "target_options": {
      "anon": True,
      "skip_instance_cache": True
    }
)

You can look at this object's .references dictionary, and interact with its internal filesystem fs.fss[None]. Nothing obvious appeared to me when I tried this. The filesize of the requested key is as big as the reference says it should be.

peterm790 commented 2 years ago

@rsignell-usgs yes @martindurant is correct this is because some of the liveocean netcdf's have been updated after the references were generated. I haven't set this up to auto update in part because of struggles with kbatch but also thought it would be better for it to run on azure rather than aws.

I can quickly rerun it to update the references but it will break again when the files update tomorrow.

rsignell-usgs commented 2 years ago

@peterm790 , what are these "kbatch struggles" of which you speak?

rsignell-usgs commented 2 years ago

@parkermac, are the LiveOcean NetCDF files being overwritten periodically with newer versions?

peterm790 commented 2 years ago

@peterm790 , what are these "kbatch struggles" of which you speak?

I suspect it might be a permissions problem but when I run a kbatch job it will just hang as 'running'. I have tried to issue a new API Token to solve this but that didn't seem to help, nor is the API token ever reflected as being used.

martindurant commented 2 years ago

It is conceivable that kerchunk should include a checksum/uuid with each referenced file to make sure you know when content changes. I'll put it on the list, but I don't see it as very high priority.

rsignell-usgs commented 2 years ago

@peterm790 , you may have already done this, but how about adding a step in the workflow that checks the modification time of each processed NetCDF file and compares that modification time of the individual JSON files using fs.info() and adds any newer NetCDF files to the list to be (re)processed?

Something like:

for i in range(len(json_list)):
    a = fs_read.info(json_list[i])['LastModified']
    b = fs_read.info(nc_processed_list[i])['LastModified']
    if b>a:
        nc_process_list.append(nc_processed_list[i])

Seems we should probably do this on every regularly-updating workflow we have!

parkermac commented 2 years ago

Rich and Peter,

The NetCDF ROMS history files in azure blob storage are overwritten every morning around 7 AM. This is just true for the files from day 1 and day 2 of the forecast, so for today's forecast the 2022.08.09 and 2022.08.10 files would be overwritten, and the 2022.08.11 files would be created new. Everything older than today's date is untouched. If there is a better way to do this let me know.

Cheers,

Parker

On Tue, Aug 9, 2022 at 3:04 AM Rich Signell @.***> wrote:

@peterm790 https://github.com/peterm790 , you may have already done this, but how about adding a step in the workflow that checks the modification time of each NetCDF file and compares that modification time of the individual JSON files using fs.info()?

— Reply to this email directly, view it on GitHub https://github.com/fsspec/GSoC-kechunk-2022/issues/6#issuecomment-1209177025, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC5WRXYAE5O5R4PXI5GGXYTVYIUK3ANCNFSM5ZRLQU2A . You are receiving this because you were mentioned.Message ID: @.***>

--

Parker MacCready

Leo Maddox Endowed Professor in Oceanography

Research Professor, School of Oceanography I live and work on the land and waters of the Squaxin Island Tribe. Email: @.** URL: faculty.washington.edu/pmacc LiveOcean Daily Forecasts http://faculty.washington.edu/pmacc/LO/LiveOcean.html Cell: (360) 359-1936, Office: (206) 685-9588 pronouns: he/him*

martindurant commented 2 years ago

That sounds like exactly the vital information that we needed, thanks @parkermac

peterm790 commented 2 years ago

I have written a new script which relies on the file etag from fs.info. etags are inteded for exactly this purpose, checking that the file has not been modified since the last visit.

I am saving the etags as a pickle file after each update script run and using these to find newly added or updated files:

  #get etags of all nc_files
  new_etags = {} 
  for f in nc_files: 
      new_etags[f] =  fs_data.info(f)['etag']

  #get previously saved dictionary containing etags at time of last update
  with fs.open('s3://esip-qhub-public/LiveOcean/etags.pickle', 'rb') as p:
      existing_etags = pickle.load(p)

  #get files that are new or updated
  updated_files = []
  for file in nc_files:
      if file in list(existing_etags): #if the file has previously existed
          if not new_etags[file] == existing_etags[file]: #check if it has been modified
              updated_files.append(file) #add to update list if modified
          else: #do nothing
              pass
      else:
          updated_files.append(file) #if the file has newly been added, add to update list

  #overwrite old etags with new ones
  with fs.open('s3://esip-qhub-public/LiveOcean/etags.pickle', 'wb') as p:
      pickle.dump(new_etags, p)

and the full workflow:

https://gist.github.com/peterm790/f5ad49e72fc6cd0644af9598b43eb180

I will have another go at scheduling this as a cronjob now.