openclimatefix / dagster-dags

Dags for running jobs on Leonardo
2 stars 0 forks source link

NWP dagster Planetary #51

Closed peterdudfield closed 6 months ago

peterdudfield commented 8 months ago

Following on from here to get NWP using dagster using planetary computing

Some docs for how to use plantary computing are here https://planetarycomputer.microsoft.com/docs/overview/about

Useful code kbatch job submit --image=ghcr.io/openclimatefix/nwp-consumer --args='["check"]' --name=nwp-consumer --output=name This returns an job-name.Then to get the logs do kbatch pod list --job-name=nwp-consumer-9rfzs --output=name kbatch pod logs nwp-consumer-9rfzs-pbdzt --stream

kbatch job submit --image=ghcr.io/openclimatefix/nwp-consumer --args='["consume", "--source", "icon", "--name", "nwp-consumer", "sink=huggingface"]'--envs="{\"HUGGINGFACE_TOKEN\":xxx,\"HUGGINGFACE_REPO_ID\":openclimatefix/dwd-icon-eu, \"ICON_PARAMETER_GROUP\":\"full\"}" --name=nwp-consumer --output=name

peterdudfield commented 8 months ago

FYI: @devsjc @jacobbieker

devsjc commented 8 months ago

Began work on this on new branch feature/planetary-computer-icon. See linked branch.

devsjc commented 8 months ago

Requires some modifications to the huggingface implementation in the consumer. See https://github.com/openclimatefix/nwp-consumer/issues/91.

devsjc commented 8 months ago

Completed all the required modifications to the consumer. Deploying to run in a test huggingface dataset for now to ensure we don't hit any commit/file limits before turning off the old cron job.

peterdudfield commented 8 months ago

Well done @devsjc, will in then just need updating where it saving too?

devsjc commented 8 months ago

Yes, once I've seen it working for a little bit!

devsjc commented 8 months ago

So we did in fact hit the commit limit saving raw files. Fix tracked by the following issue: https://github.com/openclimatefix/nwp-consumer/issues/106

devsjc commented 8 months ago

Once again deployed against a test dataset until we're happy with it.

peterdudfield commented 8 months ago

What you thinking about how long your leave it running before switching? 1 hour / 1 day / 1 week?

devsjc commented 8 months ago

ICON has runs every 6 hours, I think once I've seen two of those go through (one manual backfill, one scheduled backfill) and I've got Jacob to check he's happy with the Zarr file in huggingface, I'll be happy to move it over. So I can set off the first backfill once #67 is closed, and a scheduled one will go off at 12:00. Assuming it will take a few hours to run, might just be able to be happy with it before the end of the day!

devsjc commented 8 months ago

Backfill tests:

NOTE: requires active ssh tunnel to dagster frontend.

devsjc commented 8 months ago

The above failed due to ReadTimeout errors listing pods. See #69 for tracking.

devsjc commented 8 months ago

Backfill tests:

devsjc commented 8 months ago

These backfill tests failed - kubernetes killed the containers as they used too much memory. There's two possibilities here:

  1. The storing of the raw files used up all the available memory (they are larger than the resultant zarr)
  2. The merging of the final zarr used up all the available memory

If it is the first case, this could be solevable by distributing the workload across multiple pods using planetary computer's dask cluster features. Each pod would have to handle fewer individual files and so store less memory. This would also speed the process up as it would parallelise the compute. If it is the second case, this would not help as the final merge would use up all the available memory of the single pod that would run on as before.

As such we are going to hope it the first issue and implement distributing via a dask gateway into the nwp-consumer. See https://github.com/openclimatefix/nwp-consumer/issues/110

jacobbieker commented 8 months ago

If https://github.com/openclimatefix/nwp-consumer/issues/110 doesn't fix it, we probably could switch to uploading the zarrs as separate ones to HF? As long as they are separated in a reasonable fashion (ideally, all surface, then all pressure levels, or something like that) it would be slightly annoying to use, but workable, incase 2. is the issue.

devsjc commented 8 months ago

After having investigated how dask clusters work as part of a jupyterhub/kube deployment, we would need to create the dask cluster every time we ran the job. This means either

  1. Importing the dask_gateway controller into the conusmer and creating a cluster if a configuration option is set
  2. Modifying the dagster code so that instead of kbatch running the nwp-consumer as an image, it runs a code snipped that instantiates the cluster through the dask_gateway library and then calls the consumer again as a library.

I'm not a fan of 1) as this feels like adding functionality to the consumer that shouldn't be in there. At most the consumer should expose a configuration option to an address for a dask distributed client that is optional, but this then requires the cluster to be spun up some other way beforehand, e.g. another Op in the kbatch graph that spins up a cluster and somehow returns the address of it. I'm also not a fan of 2 as it significantly reduces portability and would require re-working the dagster code, which would take a lot of time (and this has already taken too long!)

As such, I think that @jacobbieker's suggestion above is the best bet. This would only require a (not very big) change in the consumer and the dagster code could remain pretty much untouched. I think it's the fastest and most likely route to success.

Having said that, I've thought of one more thing to try beforehand - I've put in a small change in dagster just to request more resources from the cluster to see if that helps, whilst I implement the above.

devsjc commented 7 months ago

Fixed the memory error, the next issue however is

cannot reindex or align along dimension 'step' because of conflicting dimension sizes: {1, 92} (note: an index is found along that dimension with size=92)

Occurring at this point - odd because that very function should be padding with zeros to make the shapes match. Discussing with Jacob.

devsjc commented 7 months ago

Now dealing with the icon client of the consumer failing to find downloaded files, for instance:

{ "error": "[Errno 2] No such file or directory: '/tmp/nwpc/icon_global_icosahedral_single-level_2024021900_006_ALB_RAD.grib2'", "filename": "/venv/lib/python3.12/site-packages/nwp_consumer/internal/inputs/icon/client.py", "line": "", "lineno": 331 }

Seemed to be being caused by an error in the file linkage (e.g. when moving from /tmp to raw). Removing the specifying of the raw data directory to return that to default to see if it resolves things: https://github.com/openclimatefix/dagster-dags/commit/bcfcc7c87bbe900b0f16a0600cc5504029af3687

devsjc commented 7 months ago

The above has lead to the implementation of the following changes in the consumer and dagster:

devsjc commented 7 months ago

Splitting out the process to handle single-level and multi-level files seperately has detailed a firther problem in the design of the consumer. In short, dask wasn't parallelising tasks as effectively as it could be, which may have been leading to calls to xr.Merge that were difficult to perform. See https://github.com/openclimatefix/nwp-consumer/issues/135 for details of the change.

Effectively this will split up the xr.Merge process from attempting to merge all icon files together in one call, to an initial merge of only the files in each dask partition, followed by a merging of the merged partitions. This (hopefully) should mean the datasets look more similar at all points in the merge process.

devsjc commented 6 months ago

Closing this as Planetary are discontinuing the ability to run batch jobs.