MeteoSwiss / mlpp-features

Define, track, share, and discover mlpp features.
BSD 3-Clause "New" or "Revised" License
6 stars 0 forks source link

Cache pipelines results #5

Closed frazane closed 2 years ago

frazane commented 2 years ago

Because some pipelines are reused inside other pipelines, it is desirable to cache the results of some (if not all) pipelines.

In some cases, such as this:

https://github.com/MeteoSwiss/mlpp-features/blob/863b99516496fd707c45836df620f617c1da94ba/mlpp_features/nwp.py#L249-L260

we reuse pipelines which themselves use other pipelines. Not only re-computing features each time is a waste of resources, but when executing lazily this often result in a overly complex execution task graph which clogs the computation.

Options could be:

frazane commented 2 years ago

Another way to do that could be to pass the dataset on which we are storing the results of the pipelines (basically this https://github.com/MeteoSwiss/mlpp-workflows/blob/3f8777ccf92a4b37bcc0a7d1733ddf4267bd5a87/scripts/process_source.py#L182) to the pipelines themselves. If the variable name already exists in the dataset, return it. Otherwise, compute it and return it.

Something like:

@asarray
def water_vapor_pressure_ensavg(
    data: Dict[str, xr.Dataset], stations, reftimes, leadtimes, ds, **kwargs
) -> xr.DataArray:

    if "water_vapor_pressure_ensavg" in ds.data_vars:
        return ds["water_vapor_pressure_ensavg"]

    # try/except block necessary to expose all the required input data
    try:
        data["nwp"]["dew_point_temperature"]
        data["nwp"]["air_temperature"]
    except KeyError:
        raise KeyError(["dew_point_temperature", "air_temperature"])

    dew_point_temperature = dew_point_ensavg(data, stations, reftimes, leadtimes)
    air_temperature = temperature_ensavg(data, stations, reftimes, leadtimes)

    def e_from_t(t, a, b, c):
        return c * np.exp(a * t / (b + t))

    e = xr.where(
        air_temperature > 0,
        e_from_t(dew_point_temperature, 17.368, 238.83, 6.107),
        e_from_t(dew_point_temperature, 17.856, 245.52, 6.108),
    )

    return e.astype("float32")

I already tried that out and it seems to work properly. What do you think @dnerini ?

Also could it be implemented as a decorator?

dnerini commented 2 years ago

yes, this looks like a very simple yet effective solution! and yes, a decorator would do it, as we should be able to easily get the name of the decorated function ...

very nice idea!