esm-tools / pymorize

A Python based Tool to CMORize NetCDF Data
https://pymorize.readthedocs.io
MIT License
2 stars 1 forks source link

Caching of Pipeline Steps #42

Closed pgierz closed 2 weeks ago

pgierz commented 1 month ago

Moin guys, an old friend from university who works in "The Industry" might be giving us some ideas. He was interested in what I'm working on, and always patiently listens to my programming horror stories 😉

His company does satellite analysis of farmlands and sells information to ...uh...people growing potatoes. Or something. I do not really know. In any case, it's lots of data-driven Python analysis on remote sensing data, so @tomaszps may have some hints how we can avoid shooting ourselves in the foot. This issue may be a bit more wordy than normal so he knows what's going on, but it's probably anyway good to get this out of my head and on paper...


Pipeline Caching

Background

We are building a post-processing tool that can handle NetCDF data, perform some calculations and manipulations, correct metadata, and re-write the resulting files to conform to standards required by the IPCC and various Model Intercomparion Projects (MIP). For the scientific side, this unlocks the ability to contribute our results to the IPCC and easily compare to other modelling centres, and from the HPC side, we are happy to explore workflows that we should be giving the scientists to better handle big data, since the newer projects are creating a lot more data, and traditional approaches are getting to the point where they no longer work.

During the user meeting, @chrisdane and @christian-stepanek mentioned that the old Ruby tool had the option to save intermediate results, so that if a processing step failed, you could resubmit your processing job and it would be able to skip ahead to the critical step.

I had initially written some half-baked ideas of how to do this in JSON, but this was thrown out just before we did the early user demo. I threw this out for a few reasons:

  1. We weren't caching any sort of results from each step to be able to recreate the state at the time of crashing, just a log of which step it had gotten to. This is interesting for logging, but pretty pointless otherwise. Logging can be done better than that anyway.
  2. I'm not smart enough to write unit tests that cleanly mimic parallel asynchronous execution on a SLURM cluster with interruption of a task halfway through.
  3. We elected to use prefect as a workflow backend for this project. prefect apparently has auto-caching possibilities and since part of this project (at least from the view of the HPC group) is to explore options for workflow orchestration management, I'd rather take advantage of that than building our own.

Requirements

On a Pipeline level, once you trigger the run method (I might turn this into __call__ at some point), you start a prefect Workflow, and each of the Pipeline.steps actions are run, which have been transformed from simple functions into prefect.Task objects. The following would be desirable:

pgierz commented 1 month ago

Points 1, 2, and 3 seem to be handled, but I need a way to programatically verify this beyond just "looking at it" and saying "ohh ahh it did something"...

Suggestions are very welcome.

tomaszps commented 1 month ago

Points 1, 2, and 3 seem to be handled, but I need a way to programatically verify this beyond just "looking at it" and saying "ohh ahh it did something"...

1 - You're basically asking how to test that the caching mechanism in Prefect is working if I'm understanding correctly? Or, working when you run this script for transforming netCDF files? I'm imagining the best way to test this involves either time-dependence or randomness so that you can check that the cached result is identical, but can't immediately see how that would work out.

2 - Either you do something similar to 1 with time-dependence, or you could add a secondary signal (e.g. http request, logging, or writing a dummy file). You can then check if the secondary signal gets sent out on a repeated run.

3 - Could you clarify how this is different from 1? I think I'm not understanding something about prefect or the workflow.

4 - This doesn't solve the first half of four, but: https://docs-3.prefect.io/3.0/develop/task-caching#force-ignore-the-cache
The first half of four might be resolved by including the step to start from in the tool config, and then having each subtask start with a check of the step number? Again, not spending that much time on this, just trying to throw out helpful ideas.

5 - It might be somewhat painful and brittle to do this - I don't see access to the datastore mentioned in the docs, so you're going to be writing an implementation-dependent lookup on the internal datastore of prefect. Are you thinking of having prefect launch every time somebody runs one of these conversion tasks? Otherwise I'm not sure why you'd want or need access when the framework's not running.

Hopefully this was more helpful than harmful.

pgierz commented 1 month ago
  1. This is actually a multi-facetet point, and I wrote it down inprecisely, sorry. To test that it works at all is simple enough, just look at the contents of the cache directory. I still need to think more about how this cache-key function should be implemented, the built-in one just generates (random looking) alphanumeric strings, which probably is some checksum of the program code and inputs, but I'm not entirely sure. A more human-friendly name would be helpful for that. To check that it works correctly is a bit more tricky, I need to effectively mock a program failure and see if the cache is re-used to get the state re-created as it would have been immediately before the crash. The handbook is a bit too minimal for my liking here, unfortunately.

  2. Kindof the same as 1. If I can find a good way to break it on purpose, we should be able to see that it resumes automatically at the point where it should.

  3. It's not different from 1. We just insert a dummy step that spits out the exact same inputs that it recieved, and use that to create a checkpoint in the caching sequence. Here, it would be very useful to be able to give user-defined human names to these breakpoints. Since every step is cached, this really doesn't do anything useful aside from giving some human-readable context to where in the program flow you currently are.

  4. I like the idea of numbering and giving some sort of resume control. Having a clean user interface for this will be tricky, since many of these tasks run in parallel, so we need to be able to resume them in parallel, too. More thought is required...

  5. Yes, it is painful, and heavy on the disk. A regular 1.5 Tb dataset produces checkpoint caches of several Gb per step, and from what I could see, the cache is not cleaned up automatically. These are just pickled files of the return values at specific steps, and I have a small CLI part that can dump out the pickled values if you are able to demistify this magical cache key.

Long story short, I think the next important step for me is writing a better way to generate the cache keys, that should clear up a lot of the confusion I have about how this works.

Thanks @tomaszps! :-)