pangeo-forge / cmip6-pipeline

Pipeline for cloud-based CMIP6 data ingestion pipeline
Apache License 2.0
1 stars 5 forks source link

Defining the inputs to a single zarr store output #1

Open rabernat opened 4 years ago

rabernat commented 4 years ago

On today's call, we identified a certain "breakpoint" in Naomi's workflow; after she is done cleaning and checking the user's request, she has a list of zarr stores that need to be created / uploaded and theoretically should be able to just "let it go" and have this happen automatically.

Notwithstanding all the problems that can occur in this step, let's try to define the atomic structure of a single one of these jobs.

What are the inputs that completely define a single CMIP6 zarr store output?

Imagine we were writing a function

def create_single_cmip6_zarr_store(*args, **kwargs):

What are the arguments to this function?

If we can define this, we can write the function, based on Naomi's existing notebooks. Once we write the function, we can cloudify it and investigate different execution frameworks like Apache Beam / Google Cloud Dataflow to help us automate this part of the workflow.

naomi-henderson commented 4 years ago

Here is a link to the notebook I used in today's call, nb1-DataRequests.ipynb. The repository also includes the python modules used for searching ESGF, downloading the netcdf files, etc.

@rabernat Before we can identify the arguments to your function, we need to know what this function needs to accomplish. Does "checking the user's request" include performing the ESGF search and comparing to what already exists in the cloud and therefore having a list of needed zarr stores AND the associated URLs of the netcdf files plus extra information returned by the ESGF search API (files sizes, tracking_ids, etc)?

If so, then the function only needs to:

and so my first crack at the minimal arguments should include:

zstore:     unique name of zarr store, including all identifying keywords, e.g.,
"activity_id/institution_id/source_id/experiment_id/member_id/table_id/variable_id/grid_label"

df_netcdf: dataframe of URLs, sizes and tracking_ids of netcdf files

codes:      pre-processing directives, if any are known

This what you had in mind?

rabernat commented 4 years ago

This is very useful. I hope you don't mind if we use this issue to iterate a bit on the design here.

I would say the key part of your response, and the answer to my central question, is the following. A single zarr cloud store is uniquely identified by the following tuple:

activity_id, institution_id, source_id, experiment_id, member_id, table_id, variable_id, grid_label

Throughout our pipeline, this tuple can be used as an identifier of an atomic unit of work. This should be is the essential input to basically all our preprocessing functions. The other things you mentioned, i.e. dataframe of URLs, sizes, tracking_ids of netcdf files, presence / absence of data in the cloud, etc etc. are ancillary, not fundamental. From here on I will use the shorthand dataset ID to refer to this tuple of 8 values.

One could imagine many standalone functions that take this dataset ID as an input. For example:

def is_already_in_cloud(dataset_id):
    # check GCS bucket to see if dataset is present

def query_esgf(dataset_id):
    # return list of URLs of EGFS endpoints for this dataset

def produce_and_upload_zarr_dataset(dataset_id):
    urls = query_esgf(dataset_id)
    # download urls
    # concat to one xarray dataset
    # write to zarr
    # upload to GCS

So to respond to your questions more directly

Does "checking the user's request" include performing the ESGF search and comparing to what already exists in the cloud and therefore having a list of needed zarr stores AND the associated URLs of the netcdf files plus extra information returned by the ESGF search API (files sizes, tracking_ids, etc)?

By the time we are done here, the workflow might look a bit different than it does now. Your question assumes that same basic organization as the code in your notebook. But my goal in this discussion is to boil it down to a set of separable steps. So in my mind, the steps look something like this:

  1. Checking a user's request is the process of converting the response of the form to a list of valid dataset IDs. This will involve querying ESGF of course, in order to determine whether the data requested actually exist. The output of this is a list of valid dataset IDs that exist in ESGF. Everything from this point on can operate on individual dataset IDs. The dataset IDs go into a queue and can be processed one by one.
  2. For each dataset ID, verify whether the cloud data exists and is up to date. If the cloud data doesn't exist at all, this is easy. If it does exist, we still may need to check whether updates have been published or errata have been reported, etc. The output of this step is a new queue of dataset IDs that need to actually be processed.
  3. For each dataset ID, download the source data from ESGF, convert it to Zarr, and upload. This is the "real work" of the pipeline. But it is totally separable from the above steps. (It's fine if we re-query ESGF to determine what endpoints to download.) Once we have determined we want to [re]process a dataset, we simply call this function. It also may be possible to split this step up into sub-steps.

The advantage of doing it this way is that we can start to separate out the data processing from responding to the user request. From our conversations today, I got the impression that responding to individual user requests is not scalable. We want to move towards automating the bulk transfer of the entire ESGF archive. For example, we could replace step 1 above with a cron job that automatically crawls the esgf servers and submits entire classes of dataset IDs for processing.

Am I going in the right direction?

rabernat commented 4 years ago

For reference, the specific tools I think we should be using to implement this in the cloud are Pub/Sub and Dataflow. This tutorial explains both elements: https://cloud.google.com/pubsub/docs/pubsub-dataflow#python_1

Basically, the idea is to have have a pub/sub queue which we populate with dataset IDs, and a dataflow (Apache Beam pipeline) which consumes these messages.

dgergel commented 4 years ago

This is great, thanks @rabernat and @naomi-henderson.

I took a close look at the notebooks and modules in your repo @naomi-henderson, thanks for sharing these.

What I'm gathering from this conversation is that these are the basic steps forward:

  1. decide on an overall design for how we want to restructure this workflow (we are probably most of the way there in this discussion, minus the automation part) 2a. completely separate out data processing from user requests 2b. split out each step outlined above into list of substeps with functions for each (much of this already exists and just needs to be restructured)
  2. test workflow in cloud
  3. automate pipeline

For the overall design, I like what @rabernat outlined above. And in terms of the actual work, since I am less familiar with the user request system and the ESGF querying, I think it'll be most useful if I work mostly on step 3 above.

rabernat commented 4 years ago

I'll let @naomi-henderson weigh in on the details of the actual workflow.

In terms of the software engineering of this pipeline, I recommend the following organization

The code for both of those things can live in this repo.

@dgergel - if you're looking for a concrete next step, you could try taking @naomi-henderson's notebook and refactoring some of the parts into these more standalone functions.

Note that these functions may require some configuration (which esgf nodes to prefer, where to store the temporary data, cloud credentials, etc.) These may differ between our test environments and our production environment. Ideally the same functions could be run locally or in the cloud, given the proper configuration. We should think about and carefully enumerate all of the different configuration parameters and then use configuration best practices in our library.

naomi-henderson commented 4 years ago

@rabernat , I think this is definitely headed in a better direction. Your division of the workflow into the three functions looks good, but of course I am still stuck in the weeds. Sorry about that. Hopefully you can see beyond these issues, so I will describe a few more here.

The biggest headache (by far) is that the 30 Data Nodes are very flakey - some datasets exist on multiple nodes. The more common monthly datasets are mirrored on the Federated ESGF-COG nodes as well as on their home Data node. The higher time and space resolution datasets are mostly only available on the Data nodes. This mirroring of datasets creates some bleed over between the functions. The ESGF search in query_esgf (on one or all of the four main search nodes, LLNL, DKRZ, IPSL and CEDA) may indicate that a dataset is theoretically available on some combination of nodes. We usually know which are our preferred nodes according to bandwidth and reliability, but this is not static in time. A preferred node today may go offline over the weekend, or get hacked and be offline for an extended period. 'Availability at ESGF' is not a fixed condition and the true availability is not known until we have tried to download the data in produce_and_upload_zarr_dataset.

The easiest way to deal with this is to just skip past the unsuccessful downloads. Some of the downloads hang forever, some fail very quickly and some work for some netcdf files and then fail (server is overwhelmed by requests?). So we would like to know which not to try again for awhile (so one dataset download is related to the success/failure of prior downloads). In addition, we also have unsuccessful concatenations and unsuccessful 'to_zarr' conversion (generally netcdf4 issues). So the key part of this is keeping a good record of which tasks were unsuccessful and flagging them appropriately for clean-up. So the workflow really needs a sophisticated rinse and repeat step.

Fortunately the concatenate and to_zarr failures are much simpler than the download failures - and we already have a number of common issues and fixes identified. I have yet to figure out how to deal with the constantly changing availability of the 30 Data nodes - just have a constantly changing list (skip_sites = [...] ) .

dgergel commented 4 years ago

@rabernat I think that structure sounds good. I don't have experience working with Apache Beam or Google Cloud Dataflow unfortunately.

@naomi-henderson thank you for going into the weeds with all of the ESGF node issues. This is definitely going to be important to address in the workflow. My immediate idea is essentially what you described, cycling through nodes and preferentially trying nodes that have been most successful recently, and then trying the unsuccessful nodes again after some period of time has elapsed (e.g. one week, etc). But seems like we'll need to iterate on this quite a bit.

I'll start working on refactoring Naomi's notebooks into groups of functions. As I do that I'll keep track of config parameters (thanks for linking to that best practices doc - super useful). For now I will hold off on much of the ESGF node trickiness and focus on the data processing. I expect to get started on this by the end of the week.

agstephens commented 4 years ago

Hi @naomi-henderson, In the UK, we (CEDA) run an ESGF Data Node and we are looking to convert a significant chunk of CMIP6 (~200Tb) to Zarr. Given that you have the same pipeline, might this be an opportunity to collaborate on the codebase? Do you have it available in GitHub somewhere? Thanks, Ag

naomi-henderson commented 4 years ago

Hi @agstephens,

We should probably bring @rabernat and @dgergel to this discussion. Since you are running one of the main ESGF Data Nodes, this would be a fantastic opportunity for collaboration - we could help with converting to zarr and you could help with our interaction with the ESGF data nodes, which is currently through the search API.

As you probably realize, @dgergel is currently working to make my original code more useful to the rest of the community, see issue 4. I currently use jupyter notebooks to accomplish my workflow, but this collection of notebooks was started when we really did not know of the many issues involved and they have now become quite convoluted, see, for example basic collection notebook

Any thoughts about how the CEDA folks could get involved with your refactoring @dgergel ?

rabernat commented 4 years ago

Yes this is great news. We would love to work with CEDA on this.

The challenge here is to take a set of scripts that were written for on specific environment / workflow and generalize them into a more broadly useful tool. Perhaps it's time for another meeting for us all to sync up? The challenge will be that we have collaborators in the UK and the US west coast--a 9 hour time difference. Could folks suggest a meeting time that might work?

agstephens commented 4 years ago

Hi @rabernat: 8.30am for you is 4.30pm for us. Sounds like a good time to me. Could chat next week if you are free to do so. Please suggest dates. :-)

rabernat commented 4 years ago

@dgergel -- what's your availability like at 8:30 am PT next week?

dgergel commented 4 years ago

This is great news! Excited to work with CEDA on this @agstephens. I think that the way that @naomi-henderson laid out a potential collaboration would be great, but we can talk more about this when we meet.

@naomi-henderson and @rabernat I can do Wedn - Fri next week at 8:30am PT.

rabernat commented 4 years ago

Ok, I propose Friday, Aug 28, at 8:30 PT, 11:30 ET, 16:30 UK Time. We can use https://whereby.com/pangeo.

agstephens commented 4 years ago

Hi @naomi-henderson, @rabernat, @dgergel, It's great that you'd like to meet. Having reviewed my calendar I can't do Tuesday or Friday next week. I can do Wed or Thurs if that works for you. Thanks again

dgergel commented 4 years ago

Wedn or Thursday next week work for me. How is that for you all @rabernat and @naomi-henderson?

naomi-henderson commented 4 years ago

Wed and Thurs both fine for me

rabernat commented 4 years ago

Unfortunately I can't make Wednesday or Thursday. Could this meeting happen without me? I think it can. Otherwise, perhaps push back to next week...

dgergel commented 4 years ago

@rabernat I'll be on vacation next week, so I'd prefer this week if possible. I think we can manage without you and fill you in via notes from the meeting. @agstephens and @naomi-henderson can we plan for Thursday?

rabernat commented 4 years ago

Thanks so much @dgergel.

I recommend nominating someone to chair the meeting (make an agenda and keep the call on track), and having someone else take notes in a shared doc that all attendees can access. This has been our recipe for success for video meetings.

dgergel commented 4 years ago

@rabernat thanks for the rec, that sounds great. I am happy to chair or take notes. @naomi-henderson what do you prefer?

I am out tomorrow and offline (will be hiking all day without service), so I am going to go ahead and set up the meeting. We use RingCentral at Rhodium and I haven't set up a whereby meeting myself yet. @naomi-henderson if you have and want to use that we can switch, till then I'll use RingCentral. @agstephens I am sending it to this email: Ag.Stephens@ncas.ac.uk, if you have another preferred one let me know.

naomi-henderson commented 4 years ago

Great, I’d prefer to take notes if you will chair. RingCentral is fine. Thanks

agstephens commented 4 years ago

Hi folks, thank you for being so accommodating in setting up a meeting. @dgergel: my main e-mail is ag.stephens@stfc.ac.uk but I do pick up the other one as well. I look forward to speaking to you all on Thursday. I hope we can find a way to collaborate that others can also get involved with :-)

dgergel commented 4 years ago

@rabernat here are notes from our meeting yesterday.

This is the outline of the CMIP6-in-the-cloud API refactor that I went over in the meeting:

from cmip6inthecloud.esgf import get_node, get_dataset_urls, download_files, concat_files, update_versions
from cmip6inthecloud.catalog import get_available_zarr_stores, check_for_needed_zarr_stores, update_zarr_list
from cmip6inthecloud.exceptions import check_time_dim, fix_time_dim, check_for_data_flags, handle_exceptions
from cmip6inthecloud.arco import convert_to_zarr, upload_to_gcs

# tuple that serves as a unique identifier for each zarr cloud store (zcs) 
zcs_id = (activity_id, institution_id, source_id, experiment_id, member_id, table_id, variable_id, grid_label)

# figure out which node to use 
node = get_node()

# get full list of dataset urls 
urls = get_dataset_urls(zcs_id)

# check if data exists in GCS
data_gcs = get_available_zarr_stores(urls)

# if data doesn't exist, need to access it 
data_needed = check_for_needed_zarr_stores(data_gcs)

# download and concatenate files 
files = download_files(data_needed)

# check time dimension
time_check = check_time_dim(ds)

if time_check:
  ds = concat_files()
else: 
  # if data is overlapping, remove it. Otherwise raise error 
  ds = fix_time_dim()

# check to see if dataset passes exception tests
ds_exceptions = check_for_data_flags

if not ds_exceptions:
  pass 
else: 
  ds_fixed = handle_exceptions(ds_exceptions)
  if ds_fixed:
    # dataset is fixed
    pass 
  else:
    # human intervention needed, breakpoint
    raise SystemExit("cmip6inthecloud workflow halted, requires manual cleanup")

# convert to zarr stores
new_zarr_store = convert_to_zarr(ds)

# upload to GCS
zarr_store_final = upload_to_gcs(new_zarr_store)

# update list of zarr stores in catalog
update_zarr_list(zarr_store_final)
rabernat commented 4 years ago

Thanks for leading the meeting Diana! Awesome to be able to review the notes and get a sense of what was discussed at the meeting.

My one question is: what are our next steps on this collaboration with the CEDA folks? Is there a specific point of interaction, like a shared code base? Or is this more about just exchanging experience and best practices? (Btw, we should probably branch that discussion of into a new issue, since we have drifted considerably the from the original topic.)

agstephens commented 4 years ago

Hi @rabernat and @dgergel , sorry for the silence - I've had a busy fortnight.

We had a productive meeting and I can certainly see that there are common issues that affect us, and most scientists who are trying to work with CMIP6. Our particular use case, in terms of CMIP6-to-object-store, is a subset of your overall workflow. We have the following requirement:

  1. identify a set of ESGF datasets (on local disk - since we are an ESGF Data Node)
  2. read each dataset, apply fixes if necessary, and write to Zarr files
  3. the Zarr files are in our own private cloud (which is also on-site)
  4. expose those Zarr files as a collection to selected users
  5. evaluate performance in terms of both write and read patterns

The actual code required to convert to Zarr is minimal so the main overlap we have is the exceptions component, which is really useful.

What I like the @naomi-henderson's structure is that the code is separated from the data. This prevents the code sprawling into an infinite set of if model == HadGEM and experiment == rcp6.0 clauses.

Thinking about the next steps. I think @dgergel's plan to create a new community repository is a great idea. We would be interested in getting involved in, or at the very least giving input in to, the parts of the workflow that are common to both our requirements.

Maybe we could speak again in a couple of weeks - or just continue the conversation on github and/or discourse.

agstephens commented 3 years ago

Hi @naomi-henderson @rabernat @dgergel, we have made some progress with getting our CMIP6 pipeline running in a batch environment with our own (Caringo) object store. How are things going your side of the pond? Would you like to have a follow-up meeting to see if there are possibilities for collaboration? We could maybe also invite a representative from the ESMValTool project.

rabernat commented 3 years ago

Hi @agstephens, thanks for your message! It's great to hear you've made some progress. I think the time is right for another meeting. I've also been in contact with Balaji of GFDL, and it seems like some ESGF folks may be interested in joining the discussion as well. And of course the ESMValTool folks are welcome!

I'll send out another poll to find a time we can all meet.

agstephens commented 3 years ago

Thanks @rabernat , that's great. I look forward to hearing from you.