pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
126 stars 54 forks source link

Is it possible to specify more than one MergeDim to a recipe file pattern? #402

Open derekocallaghan opened 2 years ago

derekocallaghan commented 2 years ago

Hi,

I'm looking at creating a recipe for CMEMS ASCAT wind data (following the ftp approach in https://github.com/pangeo-forge/staged-recipes/pull/163). ASCAT products are available for all combinations of Metop A/B (REP) and Metop A/B/C (NRT) satellites and ASCending, DEScending passes. I've started with the NRT products, where I was hoping to use one MergeDim each for satellite and pass options, something like:

def make_url(satellite, satellite_pass, time):
    year = time.strftime("%Y")
    month = time.strftime("%m")
    day = time.strftime("%d")
    url = f"ftp://nrt.cmems-du.eu/WIND_GLO_WIND_L3_NRT_OBSERVATIONS_012_002/KNMI-GLO-WIND_L3-OBS_METOP-{satellite}_ASCAT_12_{satellite_pass}_V2/{year}/{month}/GLO-WIND_L3-OBS_METOP-{satellite}_ASCAT_12_{satellite_pass}_{year}{month}{day}.nc"
    return url

satellite_merge_dim = MergeDim("satellite", ["A", "B", "C"])
satellite_pass_merge_dim = MergeDim("satellite_pass", ["ASC", "DES"])

pattern = FilePattern(
    make_url,
    satellite_merge_dim,
    satellite_pass_merge_dim,
    ConcatDim(name="time", keys=dates),
)

However, when creating the recipe, I get the following error in the sandbox:

---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
Input In [6], in <cell line: 140>()
    131 pattern = FilePattern(
    132     make_url,
    133     satellite_merge_dim,
    134     satellite_pass_merge_dim,
    135     ConcatDim(name="time", keys=dates),
    136 )
    138 target_chunks = {"time": 1500, "latitude": -1, "longitude": -1}
--> 140 recipe = XarrayZarrRecipe(
    141     file_pattern=pattern,
    142     target_chunks=target_chunks,
    143     process_input=ics_measurement_time,
    144 )

File <string>:21, in __init__(self, file_pattern, storage_config, inputs_per_chunk, target_chunks, cache_inputs, copy_input_to_local_file, consolidate_zarr, consolidate_dimension_coordinates, xarray_open_kwargs, xarray_concat_kwargs, delete_input_encoding, process_input, process_chunk, lock_timeout, subset_inputs, open_input_with_kerchunk)

File /srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py:783, in XarrayZarrRecipe.__post_init__(self)
    782 def __post_init__(self):
--> 783     self._validate_file_pattern()
    785     # from here on we know there is at most one merge dim and one concat dim
    786     self.concat_dim = self.file_pattern.concat_dims[0]

File /srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/recipes/xarray_zarr.py:827, in XarrayZarrRecipe._validate_file_pattern(self)
    825 def _validate_file_pattern(self):
    826     if len(self.file_pattern.merge_dims) > 1:
--> 827         raise NotImplementedError("This Recipe class can't handle more than one merge dim.")
    828     if len(self.file_pattern.concat_dims) > 1:
    829         raise NotImplementedError("This Recipe class can't handle more than one concat dim.")

NotImplementedError: This Recipe class can't handle more than one merge dim.

I wanted to check whether the restriction to a single MergeDim is fixed for a particular reason, or can be relaxed to accommodate this scenario?

Thanks, Derek

rabernat commented 2 years ago

Thanks for the useful question Derek!

Once https://github.com/pangeo-forge/pangeo-forge-recipes/issues/376 is complete, it will be possible to specify arbitrary numbers of concat and merge dims.

In the meantime, the suggested workaround is to create separate recipes for the second merge dim.

derekocallaghan commented 2 years ago

Thanks Ryan, I'll do that and also watch out for any updates to https://github.com/pangeo-forge/pangeo-forge-recipes/issues/376

cisaacstern commented 1 year ago

@derekocallaghan were you ever able to confirm that this does work in 0.10.0? If so, perhaps it will make a good tutorial.

DarshanSP19 commented 1 year ago

Hey @cisaacstern I tried 0.10.0 for multiple Merge Dimension. It breaks the pipeline.

@dataclass
class DetermineSchema(beam.PTransform):
    """Combine many Datasets into a single schema along multiple dimensions.
    This is a reduction that produces a singleton PCollection.

    :param combine_dims: The dimensions to combine
    """

    combine_dims: List[Dimension]

    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        schemas = pcoll | beam.Map(_add_keys(dataset_to_schema))
        cdims = self.combine_dims.copy()
        while len(cdims) > 0:
            last_dim = cdims.pop()
            if len(cdims) == 0:
                # at this point, we should have a 1D index as our key
                schemas = schemas | beam.CombineGlobally(CombineXarraySchemas(last_dim))
            else:
                schemas = (
                    schemas
                    | _NestDim(last_dim)
                    | beam.CombinePerKey(CombineXarraySchemas(last_dim))
                )
        return schemas

In above snipped if we run this with 3 combine dimensions (1 Concat and 2 Merge In My Case) If fails with below error.

RuntimeError: A transform with label "DetermineSchema/_NestDim" already exists in the pipeline. To apply a transform with a specified label write pvalue | "label" >> transform
cisaacstern commented 1 year ago

@DarshanSP19, thanks for this very helpful report! I think the following might fix this, as the reported error appears to be a case in which Beam doesn't know how to generate a unique name for the unlabeled _NestDim stage:

            if len(cdims) == 0:
                # at this point, we should have a 1D index as our key
                schemas = schemas | beam.CombineGlobally(CombineXarraySchemas(last_dim))
            else:
                schemas = (
                    schemas
-                   | _NestDim(last_dim)
-                   | beam.CombinePerKey(CombineXarraySchemas(last_dim))
+                   | f"Nest {last_dim.name}" >> _NestDim(last_dim)
+                   | f"Combine {last_dim.name}" >> beam.CombinePerKey(CombineXarraySchemas(last_dim))
                )

Could I entice you to try that fix, and if it works, submit it as a PR? πŸ™

Edit: Added a label to the combine stage as well, as my guess is it will have the same issue once the _NestDim labeling is resolved. Note also that another problem may surface once we're past this error, but this looks like the right place to start.

derekocallaghan commented 1 year ago

@derekocallaghan were you ever able to confirm that this does work in 0.10.0? If so, perhaps it will make a good tutorial.

Sorry @cisaacstern, I didn't notice your question until today. It's been a while since a looked at the original recipe which required multiple MergeDims, iirc I think I had a workaround that was probably a better approach. I wanted to port this recipe to Beam, so if it's still relevant I'll try your above suggestion if a PR hasn't been created in the meantime.

DarshanSP19 commented 1 year ago

@DarshanSP19, thanks for this very helpful report! I think the following might fix this, as the reported error appears to be a case in which Beam doesn't know how to generate a unique name for the unlabeled _NestDim stage:

            if len(cdims) == 0:
                # at this point, we should have a 1D index as our key
                schemas = schemas | beam.CombineGlobally(CombineXarraySchemas(last_dim))
            else:
                schemas = (
                    schemas
-                   | _NestDim(last_dim)
-                   | beam.CombinePerKey(CombineXarraySchemas(last_dim))
+                   | f"Nest {last_dim.name}" >> _NestDim(last_dim)
+                   | f"Combine {last_dim.name}" >> beam.CombinePerKey(CombineXarraySchemas(last_dim))
                )

Could I entice you to try that fix, and if it works, submit it as a PR? πŸ™

Edit: Added a label to the combine stage as well, as my guess is it will have the same issue once the _NestDim labeling is resolved. Note also that another problem may surface once we're past this error, but this looks like the right place to start.

Hey @cisaacstern It worked for me. Happy to do a PR.