Open jbusecke opened 11 months ago
pre-commit.ci autofix
Ok got the recipe deployed and cached the files. So the auth part works 🎉
But I am running into issues on dataflow now:
apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 1238, in apache_beam.runners.worker.operations.PGBKCVOperation.process
File "apache_beam/runners/worker/operations.py", line 1267, in apache_beam.runners.worker.operations.PGBKCVOperation.process
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/combiners.py", line 33, in add_input
accumulator.add_input(schema, position)
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/pangeo_forge_recipes/aggregation.py", line 69, in add_input
s["chunks"][self.concat_dim] = {position: s["dims"][self.concat_dim]}
KeyError: "time [while running 'Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr/StoreToZarr/DetermineSchema/CombineGlobally(CombineXarraySchemas)/KeyWithVoid-ptransform-63']"
I am fairly sure this is due to the fact that the files do not actually have a time
dimension (each file represents a time step, but is only a 2d lat/lon array).
I am wondering what is the best path forward:
Write a per-file preprocessor that adds a scalar coordinate based on the file metadata (Ill try that in a sec)
Yes, this what I recommend, and is how I have solved this type of problem myself, for example:
Oh crap, this is an even bigger issue. They use netcdf groups 😩. When I naively load the file with xarray I get:
Paging @TomNicholas in hopes there is some datatree/xarray wizardry that might help us here!
Hey, happy to have a look, but I'm missing a lot of context! Obviously you guys know you can use datatree to open a netcdf files with groups then look at the groups / extract each as a dataset. What is the problem?
All groups share the lon/lat coordinates, but they are weirdly stored at the root node, and then each node could have additional dimensions:
bunch of crappy ncdump screenshots
💡
hold on let me just use datatree to get a better repr:
@jbusecke I previously did a deep dive on this dataset using the pre-beam code, and ultimately came up with the code at the bottom of https://github.com/pangeo-forge/staged-recipes/issues/125#issuecomment-1077053600 as a semi-workable solution, which as you'll see creates a zarr store for each group. In Beam, there should be a better way of doing this, which may or may not benefit from Datatree.
Update for @TomNicholas: I initially thought all groups have the same dimensions and was wondering if we can brute force them into a single dataset.
The broader question here is how we deal with datatrees/groups in pangeo-forge I guess, and I thought this would intersect your interest at the moment?
But back to the discussion of this dataset: I am wondering if this could/should be compressed to a dataset instead of a tree?
But back to the discussion of this dataset: I am wondering if this could/should be compressed to a dataset instead of a tree?
If so, we could do something like:
class OpenWithDatatree(beam.PTransform):
...
class DatatreeToDataset(beam.PTransform):
def expand(pcoll: PCollection[Datatree]):
# combine the data tree nodes into a single dataset here
ds = ...
return ds
recipe = (
...
| OpenURLWithFSSpec()
| OpenWithDatatree()
| DatatreeToDataset()
| StoreToZarr()
)
If not, I'd say just make one zarr store per group.
@jbusecke I previously did a deep dive on this dataset using the pre-beam code, and ultimately came up with the code at the bottom of pangeo-forge/staged-recipes#125 (comment) as a semi-workable solution, which as you'll see creates a zarr store for each group. In Beam, there should be a better way of doing this, which may or may not benefit from Datatree.
Ughhh, I now realize that there is more history to this. Should have looked before diving in. Sorry about that.
creates a zarr store for each group
But that is not strictly necessary, right? Is it worth thinking about a datatree->nested zarr pipeline? Maybe that is in the end overkill. It might however raise some interesting edge cases for datatree (can we do a dt.concat([dt1, dt2, ...], dim='time'
?).
If you wanted to add support for datatree to pangeo-forge, I would start with simple unambiguous io functions and only do combining of datatree objects dataset-by-dataset.
can we do a dt.concat([dt1, dt2, ...], dim='time'?).
This kind of operation is not yet implemented in datatree because its too ambiguous as written.
If not, I'd say just make one zarr store per group.
Probably the most viable method for now. Thinking about how to achieve that.
I guess we have two choices here
repeated read and filter by groupname
class OpenWithDatatree(beam.PTransform):
...
@dataclass
class DatatreeGroupToDataset(beam.PTransform):
:param: var ....
def expand(pcoll: PCollection[Datatree]):
# combine the data tree nodes into a single dataset here
ds = select_single_group_from_datatree(var=var)
return ds
recipe_a = (
...
| OpenURLWithFSSpec()
| OpenWithDatatree()
| DatatreeGroupToDataset(var=a)
| StoreToZarr()
)
recipe_b = (
...
| OpenURLWithFSSpec()
| OpenWithDatatree()
| DatatreeGroupToDataset(var=a)
| StoreToZarr()
)
Seems like a pain in the butt to maintain...but might be more straightforward to implement (we could use the dictobj work from the CMIP6 feedstock to loop over groupnames and generate a recipe dict?).
emit multiple datasets from datatree and group the results before storing We will somehow have to keep track of the store name to write each dataset too (with some sort of key?). Seems much harder but also kind of interesting!
class OpenWithDatatree(beam.PTransform):
...
class SplitGroupsToDatasets(beam.PTransform):
def expand(pcoll: PCollection[Datatree]):
list = split_dt() # this would be a list of ('var', ds_var) tuples maybe?
return list # Not sure how to properly emit multiple outputs per input here
recipe = (
...
| OpenURLWithFSSpec()
| OpenWithDatatree()
| SplitGroupsToDatasets()
| GroupByVar() # this has to group all datasets that belong to each group/store (there will be multiple time steps).
| StoreToZarr(target_store='somehow generated from the grouped keys?')
)
This kind of operation is not yet implemented in datatree because its too ambiguous as written.
Yup that makes sense in general. I guess we can add 'time slices of identical tree structures' to the list of subclasses where these operations would actually be non-ambiguous because they have certain properties (similarly to the 'hollow' CMIP6 trees)...but that is a tangent.
I think that at the moment we do not really need any new features from datatree to achieve a workable solution.
| StoreToZarr(target_store='somehow generated from the grouped keys?')
The somehow generated
part of this is not workable based on anything I have seen/tried yet (and I've gone down this 🐰 🕳️ a bit!).
Adapting your first option a bit more concisely, I would suggest
@dataclass
class ModisCospRecipe(beam.PTransform):
var: str
def expand(self, pattern: PCollection):
return (
pattern
| OpenURLWithFSSpec()
| OpenWithDatatree()
| DatatreeGroupToDataset(var=self.var)
| StoreToZarr()
)
pattern = ... # same pattern for all recipes
recipe_a = beam.Create(pattern.items()) | ModisCospRecipe(var="var_a")
recipe_b = beam.Create(pattern.items()) | ModisCospRecipe(var="var_b")