pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
126 stars 54 forks source link

Should we just adopt xarray-beam as our internal data model? #256

Open rabernat opened 2 years ago

rabernat commented 2 years ago

TL;DR: what the title says

background

This team should feel very proud of the work we've done on this package over the past few months. We have shipped dozens of real recipes (albeit without any permanent infrastructure to deploy them on) and learned lots of important lessons in the process.

At the end of a long haul of work on this package, I'd like to zoom out and offer some reflections on our direction in terms of software architecture. I feel like we (specifically, I) have flailed around a bit in terms of the core internals of the Recipe model. Examples of big refactorings:

In the meantime, we have worked on many orthogonal but also important aspects, mostly around I/O. We have also waited months 😡 for Columbia to release our free credits from Prefect Cloud to actually deploy our primary bakery, which has given us plenty of time to contemplate our Prefect dependency.

In https://github.com/pangeo-forge/pangeo-forge-recipes/pull/245#issuecomment-997070261, based on my experience trying to generalize the input layer of pangeo-forge-recipes, I am basically suggesting we move to a much more general / flexible model for recipes, where we simply chain stages together. Something like

recipe = Recipe(source, transformation, destination)

or in beam-style:

recipe = source | transformation | destination

This, coupled with a one-way flow of data between these stages, would basically inevitably look a lot like xarray-beam.

We have already danced around various integrations with beam, for example:

I'd like to discuss the pros and cons of incorporating xarray-beam more tightly into pangeo-forge-recipes.

What is xarray-beam?

Xarray-beam's data model defines a convention for "keys", which are more-or-less equivalent to our Indexes. These keys can be used in the context of general Beam Pipelines which pass around xarray datasets. In contrast to pangeo-forge-recipes, there are not a lot of other custom objects. The API has just a few functions for reading, writing, rechunking, aggregating, etc.

A high level difference in the evolution of the two packages is that we kind of went bottom-up, starting from a very specific use case (many netcdfs to zarr) and are gradually trying to generalize, while they started as a much more general framework. So maybe not is the time to align more closely.

Some possible ways we could do this are...

Option A: Recipes ➡️ Beam Pipelines

This is the most "all-in" approach. We simply say that our Recipes are Beam Pipelines and build them using xarray-beam. We keep our input / file-pattern stuff and basically throw away a good portion of our code. (We still keep everything related to opening files / datasets, which is transferrable.)

Pros

Cons

Questions

Option B: Adopt parts of the xarray-beam data model but continue to use an adaptor

We continue to maintain a separate Pipeline object internally, but adopt more of the xarray-beam-like semantics, such as Keys for each stage. We use this to allow arbitrary stages to be chained together using beam-like operation like map.

Pros

Cons

Option C: Do nothing

We continue more-or-less ignoring xarray-beam and doing our own thing, possibly going down architectural rabbit holes that have presumably been solved already.


I have enumerated a lot more cons for switching to Beam, but I am still kind of thinking it's the right option.

Welcoming thoughts from everyone.

TomAugspurger commented 2 years ago

My main two questions would be

  1. Will this make developing / testing scripts harder? (probably not, since it's our internal data model, so we can hopefully retain the regular Python functions API). And will it make debugging a running workflow harder? (What's the logging story around beam)
  2. How easy is Beam to deploy outside of GCP?

I will say that having the option to run a recipe using Dask has been helpful, since I've been able to manually run recipes wherever I happen to have a dask cluster. But as bakeries get more attention that will be less important.

alxmrs commented 2 years ago

I'll write a more detailed response soon, but I will answer this question now:

Can the key accommodate our concept of MergeDim

Yes, keys can accommodate MergeDims. An in progress solution for this can be found here: https://github.com/google/xarray-beam/pull/39

alxmrs commented 2 years ago

Will this make developing / testing scripts harder? (probably not, since it's our internal data model, so we can hopefully retain the regular Python functions API). And will it make debugging a running workflow harder? (What's the logging story around beam)

Probably not, also because Beam pipelines can be run locally with the direct runner (or, in Jupyter notebooks) and can be run in unit tests. Beam's Python SDK uses the Python logging module. I use PyCharm as my IDE, and I've use it's flavor of the Python debugger to step into Beam steps (transformations).

How easy is Beam to deploy outside of GCP?

I don't have direct experience with this, and will have to investigate further. However, Beam pipelines "compile" onto Apache Spark environments. Thus, anywhere Spark can run (AWS, Azure, GCP, Databrick's offering, HPC), so can Beam pipelines. From what I've read, Beam is described as a high level API for agnostic MapReduce environments. Again, I will do some research to find out how easy the dev ops are in practice.

shoyer commented 2 years ago
  • Can each "offset" of an xarray_beam.Key point to a dataset of different length along that dimension? Like if there are a different number of timesteps in each file?

The model I picked for distributed data in xarray-beam is similar to dask.array. You can have datasets of different size, but offsets are integer offsets from the origin, rather than index labels like in dask.dataframe. If you don't know dataset sizes ahead of time, you would need to calculate the information before creating Key objects (which are mostly needed for writing to Zarr or rechunking). Alternative data models are definitely possible, but this seemed like the most straightforward to me.

Is there a particular example you are worried about?

  • Can the key accommodate our concept of MergeDim (maybe not based on this code)?

I think this would be straightforward to implement.

alxmrs commented 2 years ago

Hey @rabernat @cisaacstern, @sharkinsspatial and @TomAugspurger: How about we schedule a meeting to discuss execution engines and their tradeoffs?

rabernat commented 2 years ago

@alxmrs - thanks for keeping alive this important issue. I didn't mean to shut down the discussion at today's meeting. I'm just hyper focused on getting actually launched atm. But it would actually be great if you (Alex) could keep moving this design discussion and prototyping forward in parallel to the deployment work.

Rather than scheduling a meeting right now, let's see if we can iterate a bit asynchronously, using this issue for discussion.

Let me kick things off by suggesting that we are convolving two separate but related questions:

  1. What should be the internal data model of pangeo_forge_recipes?

    Current status: our internal data model is that recipe classes must have a ._compiler ClassVar which knows how to translate the recipe in into a Pipeline object. That is very flexible (every recipe class can implement whatever compiler it wants), at the expense of modularity. There is no way to mix or combine elements from different recipe classes. Therefore, a central goal of the refactor is to support more modularity and component reuse both internally and by end users, as in the pseudocode in my original post. There is also a desire to avoid so many side effects, instead passing data explicitly between stages, which would support the modularity goal.

  2. What executors should be supported?

    We don't want to actually implement our own distributed processing engine. Instead, we "compile" the recipes to other execution frameworks. We currently support vanilla Python, Dask, Prefect, and Beam. Ideally we would grow this to include as many suitable frameworks as possible. (Dagster looks pretty interesting.) The challenge is that each framework has different feature sets. So to some degree, element 1 (internal data model) is coupled to element 2. Selecting one single supported executor (e.g. Beam) would effectively collapse 1 and 2 to the same data model, eliminating the need for a compiler at all.

As a specific example of these tradeoffs, let's look at how Prefect does mapping. A prefect task can be either regular task of a mapped task. Many mapped tasks can be chained together. However, chained maps must be the same length. There is no mechanism for aggregation or grouping of tasks, as there is in Beam (and also possibly Dask, via blockwise). If we wanted to move to a model where data is actually passed between tasks (rather than written to disk via side effects), that means that _each stage of a Pangeo Forge Pipeline in Prefect would need to use the same number of tasks. For the typical NetCDF to Zarr workflow, assuming that the first layer contains one file per tasks, this would imply inputs_per_chunk=1 and subset_inputs=None; i.e. once the chunking is set, by one stage, we cannot change it.


A specific proposal for investigation: what if we adopt xarray-beam's general concept where every stage can be thought of as a generator which yields a key, value pair? The flow could look something like this

FilePattern -> (key, str) -> Cache -> (key, fsspec.File) -> XarrayOpener -> (key, xr.Dataset) -> ZarrWriter

Some questions that this raises:

  1. What type of key would be generic enough to support pangeo forge's use cases. The first stage for example, has no arrays yet. I think our current Index object is almost there already.
  2. How would be support stages that change the number of items in the iteration? Beam can do this via groupby or other fanciness that I don't fully understand.
  3. How would we handle reductions / side inputs? For example, ZarrWriter would need a "template" for the dataset as "side input". For unevenly sized arrays, the only way to get this is to do a reduction on all the input shapes. We are currently using cache_metadata for this. But we could instead imagine something like the following (leaving out cache for brevity)
FilePattern -> (key, str) -XarrayOpener -> (key, xr.Dataset) -> ZarrWriter
                                                   |               ^
                                             reduce schema --------|      

These side inputs would possibly be able to eliminate our reliance on a metadata cache.


The ideal outcome of this brainstorming would be a well-crafted design document for a new internal data model. Then we could get to work on actually performing this refactoring while maintaining backwards compatibility in terms of all of the real-world use cases we already support.

alxmrs commented 2 years ago

Hey Ryan,

Thanks for reframing the problem in this way. This is a great write up! It certainly gives me a lot to think about... I will start thinking about the internal PGF data model. I'll take my time, since the project is focused on the first launch anyway.

rabernat commented 2 years ago

I recently learned about lithops

Lithops is a Python multi-cloud serverless computing framework. It allows to run unmodified local python code at massive scale in the main serverless computing platforms.

Based on reading through the documentation, it looks like pangeo-forge today is already extremely compatible with the lithops execution model (does not support much inter-process communication).

Lithops supports nearly every cloud's serverless offering. There would be some big advantages in moving to a serverless execution framework. We should consider the potential pros and cons vs. the apache beam idea.

alxmrs commented 2 years ago

The ideal outcome of this brainstorming would be a well-crafted design document for a new internal data model.

I’m more actively working on this specific design doc. For now, I’d like to respond to open concerns listed in this thread so far.

We commit to only deploying on Beam, foregoing Prefect and Dask.

In theory, we could implement Prefect and Dask as Beam Runners (e.g. have adapter layers between Beam and, say, Prefect).

In fact, the Beam Runner docs suggest that Beam’s internal data model is similar to Pangeo’s, at least in goal. Beam in essence includes a compiler from dataflow description graph onto execution primitives.

We don't want to actually implement our own distributed processing engine. Instead, we "compile" the recipes to other execution frameworks.

Switching to Beam could mean reusing their compiler architecture.

Ideally we would grow this to include as many suitable frameworks as possible. … The challenge is that each framework has different feature sets.

An early section of the Beam Runner docs mentions that the framework is flexible here: “You don’t have to do it all at once, and there may even be features that don’t make sense for your runner to ever support.” The standard practice is to throw an error if the runner receives a pipeline that it cannot execute. This suggests that one could build an MVP Beam runner and iterate.

It would be interesting to investigate how feasible it would be to implement a Prefect runner for a Beam pipeline. The mapping docs you link lead me to believe that this would be totally tractable.

[Responses concerning how this could be implemented with Prefect] > However, chained maps must be the same length. > There is no mechanism for aggregation or grouping of tasks, as there is in Beam Sorry, can you point out where in the docs that it mentions this? Maybe, I’m misunderstanding their conception of a “Task”. Prefect seems to offer `reduce` and `flatten` operations. With `flatten`, iterables between maps can vary in length. With `reduce`, iterables can be aggregated (for example, reducing an iterable into another collection, like a dictionary, seems possible). Prefect does mention [an optimization](https://docs.prefect.io/core/concepts/mapping.html#prefect-approach) about invoking multiple maps in succession over the same terrible. However, I don’t think a `map` → `reduce` → `map` is disallowed. Wouldn’t these be sufficient for aggregation, grouping, and working with data of variable length? From reviewing the linked page, it sounds like Prefect includes all the same primitives that Beam offers – check out their guide on [implementing primitives](https://beam.apache.org/contribute/runner-guide/#implementing-the-beam-primitives) (it’s worth [pointing out](https://beam.apache.org/contribute/runner-guide/#the-dofn-lifecycle), we could implement this in Python only).

I’m not necessarily advocating for Pangeo Forge to maintain a Beam adapter to Prefect. I want to establish that a mapping is possible. As mentioned in the PGF biweekly meetings, it should also be possible to create a runner from Beam to Dask, which I think would be way more fruitful.

A core benefit to adopting the Beam model is that PGF would gain a lot of modularity, towards this point:

There is no way to mix or combine elements from different recipe classes. Therefore, a central goal of the refactor is to support more modularity and component reuse both internally and by end users, as in the pseudocode in my original post. There is also a desire to avoid so many side effects, instead passing data explicitly between stages, which would support the modularity goal.

One way this could be achieved is if Pangeo-Forge Recipes became a library of Composite Transforms that interacted with each other, likely via shared PCollection interfaces. XArray-Beam does provide one model for such an interface that’s oriented around XArray Datasets. However, Pangeo-Forge could provide other models for other recipe types, like the HDReference recipe. Or, it’s likely for this case that the Beam primitives would be sufficient.

How would we handle reductions / side inputs?

Beam Transformations include the capacity for adding side inputs. You can extract the value from a reduction and pass it into a side input, like so.

Lithos vs Beam

I agree with you that Pangeo-Forge today is compatible with Lithos. Here are a few pros vs cons about doubling down on the adaptor approach vs switching to Beam:

rabernat commented 2 years ago

FYI Beam now supports Python 3.9 on both Pip and Conda. 🎉

rabernat commented 2 years ago

Here is a little experiment I did tonight in preparation for tomorrow's sprint: https://gist.github.com/rabernat/15f77fb447e2cdbc73c4031c59768886

cisaacstern commented 2 years ago

I've just successfully run a Beam Pipeline as a Dataflow job, deployed from a Jupyter Notebook. The pipeline is the official wordcount example. Notes:

import re

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

class WordExtractingDoFn(beam.DoFn):
    """Parse each line of input text into words."""
    def process(self, element):
        """Returns an iterator over the words of this element.
        The element is a line of text.  If the line is blank, note that, too.
        Args:
          element: the element being processed
        Returns:
          The processed element.
        """
        return re.findall(r'[\w\']+', element, re.UNICODE)

input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://beam-dataflow-test/counts.txt'

beam_options = PipelineOptions(
    runner='DataflowRunner',
    project='pangeo-forge-4967',
    # Note: manually incremenent the `job_name` for each run.
    # In production, this will be handled programmatically.
    job_name='wordcount-example-0',
    temp_location='gs://beam-dataflow-test/temp',
    # Our institutional GCP policies forbid public IPs, xref:
    #   - https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking
    #   - https://github.com/pangeo-forge/pangeo-forge-gcs-bakery/issues/29
    use_public_ips=False,
    # The private network we've already set up:
    #   - https://github.com/pangeo-forge/pangeo-forge-gcs-bakery/pull/30
    # is in us-central1, so that's our only option unless/until we establish private networks elsewhere. xref:
    #   - https://console.cloud.google.com/networking/networks/details/default?project=pangeo-forge-4967&pageTab=SUBNETS
    region='us-central1',
)

with beam.Pipeline(options=beam_options) as p:

    # Read the text file[pattern] into a PCollection.
    lines = p | 'Read' >> ReadFromText(input_file)

    counts = (
        lines
        | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings.
    def format_result(word, count):
        return '%s: %d' % (word, count)

    output = counts | 'Format' >> beam.MapTuple(format_result)

    # Write the output using a "Write" transform that has side effects.
    output | 'Write' >> WriteToText(output_path)
sharkinsspatial commented 2 years ago

I'm in the process of preparing a separate issue discussing the multi-cloud bakery story for Beam pipeline runners but I have one initial question here. Do we want to aim for consistency across all platforms for our pipeline runner (Flink on k8s) or will we want to leverage Dataflow on GCP? I would lean towards a consistent approach across cloud providers so we can leverage the Flink monitoring API for integration with pangeo-forge-orchestrator but I'm interested to hear thoughts from others.

cisaacstern commented 2 years ago

Very interesting question, @sharkinsspatial. Personally, I would favor an incremental approach: Dataflow on GCP to begin, which allows us to de-risk and debug the Beam Pipelines themselves, with presumably minimal concerns regarding infrastructure. And then a subsequent phase in which we migrate to the more generalized, platform-agnostic implementation. My concern with going all-in on Flink to start is that too many things will be changing all at once (data model and infrastructure, instead of data model then infrastructure). Though I can also see the counter-argument that this incremental approach means accruing technical debt by working towards an approach which we may likely abandon before too long.

rabernat commented 2 years ago

In order to kickstart the discussion of implementing a Dask Beam runner, I propose we meet during the week of June 13-17. I have created a When2Meet Poll here - Dask Beam Runner Discussion - When2meet . If you are interested in attending, please give your availability. Hope to see many people there! :rocket:

alxmrs commented 2 years ago

Re: @cisaacstern's incremental approach. That LGTM, and – It would be prudent to ensure the core recipe transforms span the runners we want to target in the long-term, as described in Beam's capability matrix.

Also: I have a working prototype of a Dask Runner for Beam here https://github.com/alxmrs/beam/pull/1. I would love to start stress testing it with Pangeo-Forge recipes, if at all possible. Ideally, PGF recipes should drive the capabilities of the Dask runner.

cisaacstern commented 2 years ago

Also: I have a working prototype of a Dask Runner for Beam here https://github.com/alxmrs/beam/pull/1. I would love to start stress testing it with Pangeo-Forge recipes, if at all possible. Ideally, PGF recipes should drive the capabilities of the Dask runner.

Awesome, @alxmrs! Here's how you could start testing the Dask Runner via a PR to pangeo-forge-recipes:

  1. Install Beam from your feature branch via pip in ci/upstream-dev.yml

  2. Add an execute_recipe_beam_dask fixture in conftest.py, which can be modeled on

    https://github.com/pangeo-forge/pangeo-forge-recipes/blob/e764542d05261a0b31916c61cc3afe2a96608b81/tests/conftest.py#L535-L544

    except with, presumably, beam.Pipeline(runner="DaskRunner"), and changing the value passed to the marks argument of the decorator.

  3. Add the new execute_recipe_beam_dask as a lazy_fixture to the execute_recipe fixture at the bottom of conftest.py

  4. In .github/workflows/main.yaml:

           dependencies: ["releases-only", "upstream-dev"]
           pytest-mark: ["no_executor", "executor_function", "executor_generator",
                         "executor_dask", "executor_prefect", "executor_prefect_dask",
    -                    "executor_prefect_wrapper", "executor_beam"]
    +                    "executor_prefect_wrapper", "executor_beam", "executor_beam_dask"]
           exclude:
    +        - dependencies: "releases-only"
    +          pytest-mark: "executor_beam_dask"
             - dependencies: "upstream-dev"
               pytest-mark: "executor_function"
             - dependencies: "upstream-dev"
               pytest-mark: "executor_generator"

Opening a PR to pangeo-forge-recipes should then result in the upstream-dev test parametrization installing from your feature branch and running your DaskRunner. LMK if this makes sense or if I can help in any way!

alxmrs commented 2 years ago

Thanks for the detailed instructions! I'll set this up later this week :)

rabernat commented 2 years ago

So excited about this! 🚀

I am making steady progress on my end...see #379.

alxmrs commented 1 year ago

@rabernat wasn't sure the best place to give an update. I'm about to go on vacation for a week, and wanted to explain my status on Beam & Dask. From what I can tell, when https://github.com/apache/beam/pull/23913 lands we should have enough implemented in the Dask Runner to support executing Pangeo Forge recipes. I've tried testing things locally, and PGF pipelines seem to run correctly (all exit codes are zero, but I haven't generated ARCO data e2e quite yet).

The outstanding work to do on this PR involves passing lint + formatting checks. @pabloem is a good point-of-contact for carrying this change out to completion in my absence.