carbonplan / cmip6-downscaling

Climate downscaling using CMIP6 data
https://docs.carbonplan.org/cmip6-downscaling
MIT License
176 stars 27 forks source link

Dataset caching strategy #36

Open jhamman opened 2 years ago

jhamman commented 2 years ago

The workflows we're developing as part of this project are in need of a strategy for caching datasets in the context of using prefect with xarray and zarr. We've been discussing two options:

Background

In many places in our workflows, we have data dependencies that will be used by one or more subsequent tasks (or workflows). We want to find a way to avoid repeating the costly step of regenerating data. For the purpose of this issue, here's a simple example_task that includes some expensive steps and returns an xarray.Dataset:

@task
def example_task():
    ds = expensive_regridding_op(...)
    return ds

Option 1: Use Prefect's caching

We may be able to wire up Prefect's Result and Serializer objects to allow for a pattern that looks something like:

@task(result=AzureResult(location="cmip6-cache"), serializer=ZarrSerializer())
def example_task():
    ds = expensive_regridding_op(...)
    return ds

After reading up on this option, I'm not actually sure it is possible. Prefect's serializing API makes some assumptions that the Serializer.serialize and Serializer.deserialize methods convert data to/from Python objects and bytes, and the Result class actually does the writing. This is not how Zarr works though, we don't want to convert the Xarray.Dataset to a single string of bytes, we want to write a Zarr dataset as a collection of strings of bytes and write them directly to object store. In many ways, ds.to_zarr(...) is itself a serializer, but it tightly couples the location and serialization components.

xref: https://github.com/PrefectHQ/prefect/issues/1473

Option 2: Roll our own system

Even if option 1 is possible, we may find it easier to roll our own system:

@task
def example_task(cache='path/to/cache'):
    mapper = fsspec.get_mapper(cache)
    if exists(mapper):  # some check to see if data is in the cache already
        ds = xr.open_zarr(mapper)
    else:
        ds = expensive_regridding_op(...)
        ds.to_zarr(mapper)
    return ds

I think we'd be able to roll this functionality into a custom @cache_task, a separate @cache decorator, or a set of high-level utilities that we use inside tasks.

An outstanding question I have is whether we would want to return Datasets from tasks ever. Or, if it would be preferable to pass storage URIs.

jhamman commented 2 years ago

As a quick update to this issue, we've been experimenting with a project @andersy005 is working on called funnel. Funnel implements the necessary tooling for Option 1 above. https://github.com/NCAR/esds-funnel/pull/51 is a key extension of the current funnel api that supports stateless caching of results.