pangeo-forge / staged-recipes

A place to submit pangeo-forge recipes before they become fully fledged pangeo-forge feedstocks
https://pangeo-forge.readthedocs.io/en/latest/
Apache License 2.0
38 stars 63 forks source link

Example pipeline for GEFSv12 #17

Open chiaral opened 3 years ago

chiaral commented 3 years ago

Source Dataset

These are “reforecasts” of the new GEFSv12 system. They are retrospective forecasts spanning the period 2000-2019. These reforecasts are not as numerous as the real-time data; they were generated only once per day, from 00 UTC initial conditions, and only 5 members were provided, with the following exception. Once weekly, an 11-member reforecast was generated, and these extend in lead time to +35 days.

def preprocessing_function_ptc(path_loop, im): prova = ['/'+ifg for ifg in path_loop.split('/')[1:]] ds = xr.open_rasterio('https://noaa-gefs-retrospective.s3.amazonaws.com'+''.join(prova), chunks={'x':'200MB', 'band':-1}) ds = ds.sel(y = slice(51,20), x = slice(229, 302)) start_time = pd.to_datetime(str(path_loop.split('/')[4]),format='%Y%m%d%H') ds.coords['time'] = start_time ds = ds.expand_dims('time') ds.coords['member_id'] = im ds = ds.expand_dims('member_id') return ds

path_b = 's3://noaa-gefs-retrospective/GEFSv12/reforecast/' lglob = s3fs.S3FileSystem.glob(s3,path=path_b) for ilyear in lglob[0:4]: print(ilyear) lglob2 = s3fs.S3FileSystem.glob(s3,path='s3://'+ilyear+'/') ds_p_all = [] ds_t_all = []

for ilyear2 in lglob2[0:4]:
    print(ilyear2)
    ds_p_all1 = []
    ds_t_all1 = []
    for member in ['c00','p01','p02','p03','p04']:
        lglob3 = s3fs.S3FileSystem.glob(s3,path='s3://'+ilyear2+'/'+member+'/Days:1-10/*')
        for ilg3 in lglob3:
            if 'apcp' in ilg3:
                ds_p = preprocessing_function_ptc(ilg3, member)
                ds_p_all1.append(ds_p)
            elif 'tmp_2m' in ilg3:
                ds_t = preprocessing_function_ptc(ilg3, member)
                ds_t_all1.append(ds_t)                    
temp_p = xr.concat(ds_p_all1, dim='member_id')
temp_t = xr.concat(ds_t_all1, dim='member_id')
ds_p_all.append(temp_p)
ds_t_all.append(temp_t)

precip = xr.concat(ds_p_all, dim='time') print(precip) tmp_2m = xr.concat(ds_t_all, dim='time')


- There is no password.

## Transformation / Alignment / Merging

How to combine forecast data in a zarr format is not entirely clear to me yet - meaning what type of chunking should be used or merging.  Often times analysis are carried out both as a function of start times and lead times, so I don't think we can have one structure that makes everyone happy. But I will figure out what is the structure that makes more sense.
<!--
Describe below how the files should be combined into one analysis-ready dataset.
For example, "the files should be concatenated along the time dimension."
Are there any other transformations or checks that should be performed to make the data more "analysis ready"?
-->

## Output Dataset

I am interested in transforming them in zarr format. It might be necessary to first download them and them opening them using a grib engine and then push them to the cloud, because as of now I couldn't use a grib engine.
<!--
How do you want the output of the pipeline to be stored?
Cloud optimized formats such as zarr, tiledb, or parquet are recommended.
If possible, provide details on how you would like the output to be structured
(e.g. number of different output datasets, chunk / partition size, etc.)
-->
chiaral commented 3 years ago

Brainstorming on data structure.

The data have these dimensions:

If I always had the same shape for each variable, the creation of the data structure would require to just think about which coordinate to chunk, but the overall shape of the data would be the same (i.e., Start time, lead time, lon, lat, levels, members).

In this case we have 4 dimensions (lat, lon, ensemble member, and lead time) which varies. If I were to keep the largest size for all data , i.e. 0.25x0.25 degree 11 members and 35 days lead time, I would have a much larger data set with a lot of NaN.

Anyone has any input or ideas on how to go about that?

cc @rabernat @mktippett @rsignell-usgs @abarciauskas-bgse

EDIT April 19: Some of these comments are now obsolete:

1) if I use cfgrib the start time is correctly read by this engine - no need to have a preprocessing function for that bit 2) I could not have one same spatial resolution for 0.25 and 0.5, that would be incorrect - I could have one file for the subseasonal bit of the data (those ensemble members which extend to 35 days).

chiaral commented 3 years ago

(more notes on the data) I spoke too early! Initial time is loaded as an attribute of the variable and of the forecast_time0 - that spares me from a preprocessing the file name.

Some missing data documented here

chiaral commented 3 years ago

I think this is the structure of the data - which is showing to be more variable than i wished for. Also somewhat not documented well.

gefsv12_structure1

because of the many varying coordinate system - and the current inability of xarray to_zarr() method to handle missing data - without filling them in and writing them out as NaN - I plan - for now - to have 4 separate datasets for each variable that will have homogeneous coordinates systems.

gefsv12_structure2

Datasets A and B will have a daily start time coordinate, C and D will have a weekly start time coordinate. All datasets can be merged - after sub-selecting area of interest for example, or years of interest, or lead time of interest, and then merged. xarray at that point only will fill the missing data with NaN.

Note, some variables will have other coordinates at times (i.e. isobaric levels), but again, since I create one dataset per variable (for a total of 59 variables) as long as they don't change that level in portions of the data that differ from the A,B,C,D portions defined above I should be ok.

At the end of this I should have 59 (variables) times 4 (datasets portions) = 236 zarr files.

Right now I have (I think) (20 years 365.25 days 2 day sets 5 ensemble members)+(20 years 52 weeks 2 day sets 6 extra ensemble members) = 85530 grib files

rabernat commented 3 years ago

At the end of this I should have 59 (variables) times 4 (datasets portions) = 236 zarr files

Is there a reason we can't put the different variables into a single Zarr store? Do they have different dimensions

It sounds like this recipe requires the following features to be implemented in pangeo-forge

Fortunately, it sounds like the number of items in each file is fixed and known from that outset.

chiaral commented 3 years ago

Is there a reason we can't put the different variables into a single Zarr store? Do they have different dimensions

I have to explore the data more - with forecast data from grib files there is always the possibility that each variable has some small difference and I don't want to overwrite things (i.e. correct coordinate names or similar), but I want to leave it as much as possible as the original ones. Possibly, I will be able to merge a few variables together in one zarr file. I guess it's not clear to me how they would be served, then. I guess just a bucket with documentation? I like now that each variable has a different filename, so you know exactly what you are loading.

rabernat commented 3 years ago

IMO, if the all the variables have the exact same dimensions and coordinates, they should be combined into a single zarr group. In addition to being more convenient for the user, it's more compact, since you don't have to duplicate all the coordinates.

I guess it's not clear to me how they would be served, then. I guess just a bucket with documentation?

What do you mean by "served"? The data will be in object storage. There will be a catalog where users can browse and see exactly what is there, plus an intake layer to help load the data. Just like today on https://catalog.pangeo.io/.

chiaral commented 3 years ago

Files don't have the same exact coordinates. Some have levels, but also the forecast time in some file is split in two 40 steps - i am trying to figure out why - instead of one 80 steps. But as I said, I think we can make some reduction and aggregation.

chiaral commented 3 years ago

I am going to document other issues and discrepancies with the data in this issue https://github.com/awslabs/open-data-registry/issues/809 There are a few things that are not consistent.

judithberner commented 3 years ago

Hi, I am Judith working on converting subseasonal to seasonal (S2S) data to zarr for upload to the cloud. We have many of the same issues. 1) chunking - there should be no or little chunking in the start time dimension, because we are operating over this dimension. Need to ask/think about forecast lead time. 2) What should be in a data file? I have the same questions. For the CESM Large ensemble we are moving from outputting dumps of all variables for each output interval to outputting timeseries for each variable, because users are often interested in one variable only. In the past they had to open and read all files, and then pull out the variable they want. Is that not an issue for zarr-data?

rabernat commented 3 years ago

2. Is that not an issue for zarr-data?

Correct, not an issue at all. In fact, there is not really a zarr "file" at all. A zarr store is more like a directory. You can have as many variables as you want in there and the cost for "opening" the store is basically zero.

judithberner commented 3 years ago

So if you have a 'local' jupyterhub, e.g. at NCAR, the subsetting of the data is done in the cloud and only one variable would go into the 'local' memory?

Chiara - I have found this webpage helpful https://ncar.github.io/cesm-lens-aws/ It's the documentation for the Large Ensemble computed and cloudified at NCAR. I looked into it and they converted each netcdf file into one zarr-file (yearly initial dates, since it's a climate dataset ). Indeed it was pointed out to me that the idea behind "cloud buckets" was the need to upload many small zarr-files, but maybe I misunderstood something.

Finally, I am thinking now that the lead-time should be minimally chunked as well.

chiaral commented 3 years ago

RE: chunking

  • chunking - there should be no or little chunking in the start time dimension, because we are operating over this dimension. Need to ask/think about forecast lead time. Finally, I am thinking now that the lead-time should be minimally chunked as well.

Ok I will explore how small can I go in lat and lon (gotta chunk somewhere!), but will keep the chunks in Start and Lead time as large in size (so few chunks) as possible

  • What should be in a data file? I have the same questions. For the CESM Large ensemble we are moving from outputting dumps of all variables for each output interval to outputting timeseries for each variable, because users are often interested in one variable only. In the past they had to open and read all files, and then pull out the variable they want. Is that not an issue for zarr-data?

I have created a few zarr-dataset and created many small zarr files. Once they are consolidated, the time it takes to read them lazily is not very different from a large file, but I think it's best to have as little zarr directories as possible, simply to reduce the need to write loops to open all the zarr files.

So if you have a 'local' jupyterhub, e.g. at NCAR, the subsetting of the data is done in the cloud and only one variable would go into the 'local' memory?

My understanding and experience is that the time it takes to read many small (consolidated, if they are not conssolidated it takes forever) zarr directories or one large directory is almost the same in terms of time, but you will need to write some loops to get all the zarr store links in the case of having many directories, and it is just more complicated than simply have to open one zarr directory. You can also do drop_variables= in the open_zarr command, to drop what you don't care (I wish there were a open_only option... but there is not).

Chiara - I have found this webpage helpful https://ncar.github.io/cesm-lens-aws/ It's the documentation for the Large Ensemble computed and cloudified at NCAR. I looked into it and they converted each netcdf file into one zarr-file (yearly initial dates, since it's a climate dataset ). Indeed it was pointed out to me that the idea behind "cloud buckets" was the need to upload many small zarr-files, but maybe I misunderstood something.

My biggest concern and headache was to figure out how to put together the ensemble members that are longer and more numerous and they happen only once a week with the rest of the dataset. If you go through xarray you need to have a gridded dataset, so I would need to put tons of NaNs to fill the lead times and the ensemble members that are not given in the majority of the start times. My understanding is that you can have unequal coordinates within a zarr files, however, you have to bypass xarray to use it that way, and I just feel like it might be not worth it. Hence, I decided/thought to use the structure above in the vignette to split the extra ensnemble members and lead times in a separate zarr directory.

thanks so much for your input!

rabernat commented 3 years ago

Does anyone understand .idx sidecar files that come with the GEFS grib data? For example

It looks like

1:0:d=2000010100:ACPCP:surface:0-3 hour acc fcst:ENS=low-res ctl
2:325317:d=2000010100:ACPCP:surface:0-6 hour acc fcst:ENS=low-res ctl
3:720943:d=2000010100:ACPCP:surface:6-9 hour acc fcst:ENS=low-res ctl
4:1037396:d=2000010100:ACPCP:surface:6-12 hour acc fcst:ENS=low-res ctl
5:1409304:d=2000010100:ACPCP:surface:12-15 hour acc fcst:ENS=low-res ctl
6:1739118:d=2000010100:ACPCP:surface:12-18 hour acc fcst:ENS=low-res ctl
7:2124405:d=2000010100:ACPCP:surface:18-21 hour acc fcst:ENS=low-res ctl
8:2461831:d=2000010100:ACPCP:surface:18-24 hour acc fcst:ENS=low-res ctl
9:2847727:d=2000010100:ACPCP:surface:24-27 hour acc fcst:ENS=low-res ctl
10:3178852:d=2000010100:ACPCP:surface:24-30 hour acc fcst:ENS=low-res ctl
11:3563279:d=2000010100:ACPCP:surface:30-33 hour acc fcst:ENS=low-res ctl
12:3899970:d=2000010100:ACPCP:surface:30-36 hour acc fcst:ENS=low-res ctl
...

Perhaps @zflamig?

This looks close to the concept of the reference filesystem recentl developed by @martindurant.

rabernat commented 3 years ago

I made a minimal working recipe as follows

from pangeo_forge.recipe import NetCDFtoZarrSequentialRecipe

url1 = 'https://noaa-gefs-retrospective.s3.amazonaws.com/GEFSv12/reforecast/2000/2000010100/c00/Days%3A1-10/acpcp_sfc_2000010100_c00.grib2'

recipe = NetCDFtoZarrSequentialRecipe(
    input_urls=[url1],
    sequence_dim='time',
    cache_inputs=False,
    copy_input_to_local_file=True,
    xarray_open_kwargs={'backend_kwargs': {'filter_by_keys': {'totalNumber': 20}}}
)

with recipe.open_input((0,)) as ds:
    print(ds)
<xarray.Dataset>
Dimensions:     (latitude: 721, longitude: 1440, step: 2)
Coordinates:
    number      int64 0
    time        datetime64[ns] 2000-01-01
  * step        (step) timedelta64[ns] 03:00:00 06:00:00
    surface     int64 0
  * latitude    (latitude) float64 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0
  * longitude   (longitude) float64 0.0 0.25 0.5 0.75 ... 359.2 359.5 359.8
    valid_time  (step) datetime64[ns] 2000-01-01T03:00:00 2000-01-01T06:00:00
Data variables:
    acpcp       (step, latitude, longitude) float32 ...
Attributes:
    GRIB_edition:            2
    GRIB_centre:             kwbc
    GRIB_centreDescription:  US National Weather Service - NCEP 
    GRIB_subCentre:          2
    Conventions:             CF-1.7
    institution:             US National Weather Service - NCEP 
    history:                 2021-04-05T22:51:59 GRIB to CDM+CF via cfgrib-0....

This shows that Pangeo Forge can open these files. But someone else will need to figure out how to put them together.

mktippett commented 3 years ago

Does anyone understand .idx sidecar files that come with the GEFS grib data?

Wesley Ebisuzaki. Index files (wgrib2 inventories) allow random access of grib files and have a grads origin story. https://nomads.ncep.noaa.gov/txt_descriptions/fast_downloading_grib.shtml My impression is that they save time by not having to decompress the grib files.

chiaral commented 3 years ago

Thanks @rabernat for the starting example. I am working on another example that does not uses apcp or any of the variable that are stored with different underlying time width (as explained in this issue) because cfgrib has issues with that, hence you need to add that xarray_open_kwargs={'backend_kwargs': {'filter_by_keys': {'totalNumber': 20}}}.

Right now on the variable cape_sfc_2000010100_c00.grib2 , for example, that line is not needed and it loads the file correctly.

I believe that cfgrib does not like the change in resolution in the grib files, or doesn't know what to do with that. pynio instead recognizes them as two variables:

<xarray.Dataset>
Dimensions:                  (forecast_time0: 40, forecast_time1: 40, lat_0: 721, lon_0: 1440)
Coordinates:
  * forecast_time1           (forecast_time1) timedelta64[ns] 0 days 03:00:00...
  * lat_0                    (lat_0) float32 90.0 89.75 89.5 ... -89.75 -90.0
  * lon_0                    (lon_0) float32 0.0 0.25 0.5 ... 359.5 359.75
  * forecast_time0           (forecast_time0) timedelta64[ns] 0 days 06:00:00...
Data variables:
    ACPCP_P11_L1_GLL0_acc6h  (forecast_time0, lat_0, lon_0) float32 dask.array<chunksize=(1, 721, 1440), meta=np.ndarray>
    ACPCP_P11_L1_GLL0_acc3h  (forecast_time1, lat_0, lon_0) float32 dask.array<chunksize=(40, 721, 1440), meta=np.ndarray>

Ideally we would want to use the same engine for everything, because pynio and cfgrib outputs can look very different. I will try to open an issue on cfgrib but otherwise we have two options:

1) use pynio for those variables with different averaging/accumulating intervals (which are valid for day 1-10) and the post/pre process thexarray.Dataset to make them look like those generated by 'cfgrib' 2) pre-process the data - a big preprocess - to just homogenize the data structure to be 3-hourly and that's it - similarly to all the instantaneous variables.

chiaral commented 3 years ago

Updated working example using another variable that doesn't have incompatibilities with cfgrib

Load and create structure of list of input

from pangeo_forge.recipe import NetCDFtoZarrSequentialRecipe

input_url_pattern = (
    "https://noaa-gefs-retrospective.s3.amazonaws.com/GEFSv12/reforecast"
    "/{yyyy}/{yyyymmdd}00/{ensmem}/Days%3A1-10/cape_sfc_{yyyymmdd}00_{ensmem}.grib2"
)

create list of input, input_urlsl

import pandas as pd

dates = pd.date_range("2000-01-01", "2000-01-5", freq="D")

ensmem_list = ['c00']#, 'p00', 'p01', 'p02', 'p03']
input_urlsl = []
for em in ensmem_list:
    for day in dates:
        input_urlsl.append(input_url_pattern.format(ensmem = em,
                                                   yyyy=day.strftime("%Y"), 
                                                   yyyymm=day.strftime("%Y%m"), 
                                                   yyyymmdd=day.strftime("%Y%m%d")
                                                    ))

print(input_urlsl)

for now i look at one ensemble member at the time, c00

Create preprocessing function and recipe:

def preprocess_input(ds, fname):
    return ds.expand_dims('time')

recipe = NetCDFtoZarrSequentialRecipe(
    input_urls=input_urlsl,
    sequence_dim='time',
    cache_inputs=False,
    copy_input_to_local_file=True,
    process_input = preprocess_input
#     xarray_open_kwargs={'backend_kwargs': {'filter_by_keys': {'totalNumber': 20}}} # this is not needed now
)

Prepare target directory

import tempfile
from fsspec.implementations.local import LocalFileSystem
from pangeo_forge.storage import FSSpecTarget, CacheFSSpecTarget

fs_local = LocalFileSystem()

# cache_dir = tempfile.TemporaryDirectory()
# cache_target = CacheFSSpecTarget(fs_local, cache_dir.name)

target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)

# recipe.input_cache = cache_target
recipe.target = target
recipe

Add some currently not well documented magic by ryan that helps with debugging

def set_up_pangeo_forge_logging():
    import logging
    import sys
    formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger("pangeo_forge.recipe")
    logger.setLevel(logging.DEBUG)
    sh = logging.StreamHandler(stream=sys.stdout)
    sh.setFormatter(formatter)
    logger.addHandler(sh)
set_up_pangeo_forge_logging()

Run recipe

recipe.prepare_target()
for chunk in recipe.iter_chunks():
    recipe.store_chunk(chunk)
recipe.finalize_target()

Load the file from target:

import xarray as xr
ds = xr.open_zarr(target_dir.name)
ds

<xarray.Dataset>
Dimensions:     (latitude: 721, longitude: 1440, step: 80, time: 5)
Coordinates:
  * latitude    (latitude) float64 90.0 89.75 89.5 89.25 ... -89.5 -89.75 -90.0
  * longitude   (longitude) float64 0.0 0.25 0.5 0.75 ... 359.2 359.5 359.8
    number      int64 ...
  * step        (step) timedelta64[ns] 0 days 03:00:00 ... 10 days 00:00:00
    surface     int64 ...
  * time        (time) datetime64[ns] 2000-01-01 2000-01-02 ... 2000-01-05
    valid_time  (step) datetime64[ns] dask.array<chunksize=(80,), meta=np.ndarray>
Data variables:
    cape        (time, step, latitude, longitude) float32 dask.array<chunksize=(1, 80, 721, 1440), meta=np.ndarray>
Attributes:
    Conventions:             CF-1.7
    GRIB_centre:             kwbc
    GRIB_centreDescription:  US National Weather Service - NCEP 
    GRIB_edition:            2
    GRIB_subCentre:          2
    history:                 2021-04-06T15:54:34 GRIB to CDM+CF via cfgrib-0....
    institution:             US National Weather Service - NCEP 
aaronspring commented 3 years ago

Nice work @chiaral. I used GEFS output for my climpred example: https://climpred.readthedocs.io/en/latest/examples/NWP/NWP_GEFS_6h_forecasts.html

I load data from thredds with intake-xarray. But only a few starting dates. Looking forward to testing your GEFS zarr with climpred in the pangeo cloud.

I use cfgrib and use backend_kwargs to select just one variable, but ideally we want to have multiple (all) vars in one huge zarr.

chiaral commented 3 years ago

Thanks for pointing me to this example. Indeed, the idea for this case is to use the data on the cloud and not on the thredds server. Out of curiosity, do you know if the data on thredds have the same funny alternating accumulating period I reported above? I suppose so. How do you handle that?

The structure I am thinking right now is almost a huge zarr file but not quite, because as you might know, # of ensemble, spatial, and temporal resolution vary in the dataset after 10 days and every weekly start date, so it would be wasteful to have one huge zarr (you would end up with a lot of missing values with NaN data). But the idea is to have 4 blocks of data as illustrated above.

A note on the missing values: my understanding is that zarr could easily handle this type of "sparse" matrix (i.e. missing ensemble members or varying resolution), but xarray needs a gridded dataset, hence the idea to split the data in A,B, C, and D. I would love to hear feedback if you have any about that idea! thanks!

aaronspring commented 3 years ago

Out of curiosity, do you know if the data on thredds have the same funny alternating accumulating period I reported above? I suppose so. How do you handle that?

didn’t check. Just wanted to get a initial working example and therefore only downloaded a few starting dates. Found thredds overview here: https://www.ncdc.noaa.gov/data-access/model-data/model-datasets/global-ensemble-forecast-system-gefs

ensemble, spatial, and temporal resolution vary in the dataset after 10 days and every weekly start date

I’m surprised too. Is there any explanation given by the GEFS issuing agency for that? Only found storage reason in your linked docs. I am working in decadal predictions, so I am not an expert...

I don’t see any problem with changes in Temporal resolution in step. The change in spatial resolution surprises me most. Very nice figure above. Didn’t appreciate it a lot when initially reading this issue. I’d go with 4 zarrs A-D as you propose. Maybe it could be A interpolated to B and concatinated to B instead of just B, i.e. the longer forecasts starting at the first step and A regridded spatially and temporally like B. That would be the datasets much for analysis ready. likewise for D.

No experience with sparse. ABCD sounds reasonable.

aaronspring commented 3 years ago

Regarding chunking, my experience is that having step:1 many chunks if needed and time:-1 no chunking is good if skill is computed over many forecast_reference_times (here time). If only a single forecast time is chosen, then chunking time: 1 and step:-1 is best. So chunking depends on use case. We’d have to play around with the zarrs on the cloud to give a best case answer.

martindurant commented 3 years ago

I don't quite gather the structure of the data you are writing, but I will comment that zarr has no trouble with whole missing chunks in any dataset, and xarray will be fine if you build a dataset where, yes, the coordinates are defined, but chunks are missing for some of the variables and coordinate combinations. Attempts to access these regions will return whole chunks of NaN (or other defined fill value), without, of course, having read anything from storage.