pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
126 stars 54 forks source link

How should flow parameterization work? #13

Open jhamman opened 4 years ago

jhamman commented 4 years ago

Now that we've started creating actual pangeo forge datasets, we're starting to notice the need for flow parameterization. I'll provide a few examples of where we may want to use a parameterized variable in our flows:

  1. Parameterize targets - We may want to create a dataset in multiple clouds. It should be possible to parameterize the target_url without needing to run the flow twice.
  2. Parameterize sources - Sometime we have datasets where we'll want to run the same flow each part of the dataset. The FIA dataset (https://github.com/pangeo-forge/staged-recipes/issues/3) is a good example of this use case.
  3. Parameterize chunking - We may want to create the same dataset (e.g. Zarr) multiple times, with different chunking schemes.

Prefect supports parameterizing flows (https://docs.prefect.io/core/examples/parameterized_flow.html). The question is whether we want to use the prefect functionality or move toward a pangeo-forge api for this sort of thing.

TomAugspurger commented 4 years ago

I think that using Prefect's Parameters is probably best. They make things a bit more complicated to debug outside of a Flow context, but I don't think people will be doing that anyway. We'll need good documentation & examples on debugging within a flow context.

I have an example at https://github.com/TomAugspurger/noaa-oisst-avhrr-feedstock/blob/5aa7b9007d1055c4e03306b87358ac916d559e59/recipe/pipeline.py. A few things to note:

  1. I defined the parameters as class attributes on the Pipeline. There's nothing special about them being there though (indeed, the Pipeline isn't really providing anything in that example).
     # Flow parameters
    days = Parameter(
        "days", default=pd.date_range("1981-09-01", "1981-09-10", freq="D")
    )
    variables = Parameter("variables", default=["anom", "err", "ice", "sst"])
    cache_location = Parameter(
        "cache_location", default=f"gs://pangeo-forge-scratch/cache/{name}.zarr"
    )
    target_location = Parameter(
        "target_location", default=f"gs://pangeo-forge-scratch/{name}.zarr"
    )
  1. Using prefect parameters requires adopting more Prefect things elsewhere. For example, the previous version used Python-native things like list comprehensions and functions.
    @property
    def sources(self):
        source_url_pattern = (
            "https://www.ncei.noaa.gov/data/"
            "sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
            "{yyyymm}/oisst-avhrr-v02r01.{yyyymmdd}.nc"
        )
        source_urls = [
            source_url_pattern.format(yyyymm=day.strftime("%Y%m"), yyyymmdd=day.strftime("%Y%m%d"))
            for day in self.days
        ]

        return source_urls

    @property
    def flow(self):
        ....
            nc_sources = [
                download(x, cache_location=self.cache_location)
                for x in self.sources  # a regular python list
            ]

Prefect Parameters can't be iterated over, so we need a "prefect-native" way of doing it. In this case, the source_url definition was moved to a task and we map it over the input variable.

https://github.com/TomAugspurger/noaa-oisst-avhrr-feedstock/blob/5aa7b9007d1055c4e03306b87358ac916d559e59/recipe/pipeline.py#L53-L65

        with Flow(self.name) as _flow:
            sources = source_url.map(self.days)
TomAugspurger commented 4 years ago

One potential downside of Prefect parameters, they must be JSON serializable (which I just ran into, since datetime.datetime objects aren't JSON serializable)