ocean-transport / scale-aware-air-sea

Repo for collaborative project on scale-aware air-sea fluxes
1 stars 0 forks source link

Prototype of beam pipeline for fluxes #63

Open cisaacstern opened 9 months ago

cisaacstern commented 9 months ago

Based on pair session with @jbusecke today:

# airseabeam.py

from dataclasses import dataclass
import apache_beam as beam

def append_val(t: tuple, val: str) -> tuple:
    k, v = t
    return (k, v + val)

class PangeoForgeRecipe(beam.PTransform):
    def expand(self, pcoll: beam.PCollection[tuple]):
        return pcoll | beam.Map(append_val, val="_arco")

class XBeamFilter(beam.PTransform):
    def expand(self, pcoll: beam.PCollection[tuple]):
        return pcoll | beam.Map(append_val, val="_filtered")

def add_spec(t: tuple, spec: str):
    k, v = t
    merged = "+".join(list(v))
    return (k, f"${merged}$_{spec}")

@dataclass
class MixVariables(beam.PTransform):
    spec: str
    def expand(self, pcoll: beam.PCollection[tuple]):
        return pcoll | beam.Map(add_spec, spec=self.spec)

def flatten_tuple(t: tuple):
    k, v = t
    arco, filtered = v
    return (k, (arco[0], filtered[0]))

class XBeamComputeFluxes(beam.PTransform):
    def expand(self, pcoll: beam.PCollection[tuple]):
        # return pcoll | xbeam.Something()
        return pcoll | beam.Map(append_val, val="_flux")

class AirSeaPaper(beam.PTransform):

    def expand(self, pcoll):
        arco = pcoll | PangeoForgeRecipe() # -> Zarr(data_vars={a, b, c})
        filtered = arco | XBeamFilter()  # -> Zarr(data_vars={a_f, b_f, c_f})
        nested = (arco, filtered) | beam.CoGroupByKey() | beam.Map(flatten_tuple)

        a_b_c = nested | "mix 0" >> MixVariables(spec="a,b,c")
        a_b_cf = nested | "mix 1" >> MixVariables(spec="a,b,cf")
        a_bf_cf = nested | "mix 2" >> MixVariables(spec="a,bf,cf")
        fluxes = (a_b_c, a_b_cf, a_bf_cf) | beam.Flatten() | XBeamComputeFluxes()
        return fluxes

if __name__ == "__main__":
    input_data = [("cesm", "cesm_ds"), ("cm26", "cm26_ds")]
    with beam.Pipeline() as p:
        (
            p
            | beam.Create(input_data) 
            | AirSeaPaper()
            | beam.Map(print)
        )
$ python airseabeam.py
('cesm', '$cesm_ds_arco+cesm_ds_arco_filtered$_a,b,c_flux')
('cm26', '$cm26_ds_arco+cm26_ds_arco_filtered$_a,b,c_flux')
('cesm', '$cesm_ds_arco+cesm_ds_arco_filtered$_a,bf,cf_flux')
('cm26', '$cm26_ds_arco+cm26_ds_arco_filtered$_a,bf,cf_flux')
('cesm', '$cesm_ds_arco+cesm_ds_arco_filtered$_a,b,cf_flux')
('cm26', '$cm26_ds_arco+cm26_ds_arco_filtered$_a,b,cf_flux')
cisaacstern commented 9 months ago

Next steps:

Other topics:

jbusecke commented 9 months ago

I am getting back into the processing and just made a sketch of the workflow for a single model. I thought that might be helpful here: Blank diagram-10