NERC-CEH / object_store_tutorial

Respository with examples on how to use the JASMIN high-performance object storage
GNU General Public License v3.0
12 stars 1 forks source link

Create example for the UKCEH GEAR-1hrly dataset #3

Open mattjbr123 opened 4 weeks ago

mattjbr123 commented 4 weeks ago

As part of the FDRI project work package 2 I am (co-)developing a "Gridded Data Tools" product that intends to allow easy conversion of gridded data to ARCO format, easy upload to object-storage, cataloguing of the data and easy access and analysis of this data.

I thought this object_store_tutorial repo would be a good place to start, at least for the "ingestion" stage (conversion and upload to object storage) replicating the workflow shown in the README, scripts and notebooks for the UKCEH GEAR-1hrly dataset. Initially away from any complicated cloud infrastructure like kubernetes, argo, airflow etc. and just running it manually locally or on JASMIN.

I'll attempt to document progress here, and add a further script and/or notebook to the repo in the respective folder. Development work will be on the 'GEAR' branch.

mattjbr123 commented 3 weeks ago

16/08/2024 Things I discovered today whilst adapting the recipe for the GEAR dataset:

mattjbr123 commented 3 weeks ago

Next thing to do is to test out the adapted recipe with the GEAR dataset, running locally on the JASMIN sci machines first with a subset of the data, then with all the data on the LOTUS cluster. Using the 'DirectRunner' of Apache Beam, which is the un-optimized one-size-fits-all Runner. If/when successful, maybe look into the new DaskRunner for Beam, as this may unlock writing direct to object storage without needing to get it working on Flink, or rewriting the codebase for other workflow engines (such as Airflow).

mattjbr123 commented 3 weeks ago

22/08/2024

What I've found out after testing the adapted recipe.

To do next:

mattjbr123 commented 3 weeks ago

The solution might be as simple as adding the dimension to the target_chunks variable... It's not. pangeo-forge-recipes really does seem to require all the variables in the file to be the same size (have identical dimensions and dimension sizes).

This seems to be the relevant issue: https://github.com/pangeo-forge/pangeo-forge-recipes/issues/644

From which we can narrow down the problem further to "pangeo-forge-recipes requires all variables that are not coordinates to have the concat_dim in them". The proposed solution would be fine if only the xarray open_dataset function (which pangeo-forge-recipes's OpenWithXarray function uses) accepted preprocess as a kwarg. In reality only xarray open_mfdataset accepts preprocess. So to implement this solution we would have to develop our own custom OpenWithXarray that uses open_mfdataset instead, which definitely seems overkill. Instead we might have to create our own Beam Transform function (similar to here) that does the same thing.

Might be worth putting a PR together to add in that suggested more helpful error

mattjbr123 commented 2 weeks ago

23/08/2024

Today has been a day of trying to understand how 'preprocessors' might work in a Beam pipeline and pangeo-forge-recipes context. I've figured out the general struture and syntax of preprocessors, just through looking at multiple examples, but there's definitely a shortfall in the pangeo-forge-recipes documentation here.

I've been doing this so that I can add a preprocessor that converts the annoying extra variables in the netcdf files to coordinates so that the OutputToZarr pangeo-forge-recipes Beam PTransform can handle the dataset, as suggested above. I found an example in another recipe that should do more or less what I want.

The general structure of preprocessors seems to be:

from pangeo_forge_recipes.transforms import Indexed, T

# They are implemented as subclasses of the beam.PTransform class
class NameOfPreprocess(beam.PTransform):

    # not sure why it needs to be a staticmethod
    @staticmethod
    # the preprocess function should take in and return an object of type Indexed[T]
    # these are pangeo-forge-recipes derived types, internal to the functioning of the pangeo-forge-recipes transforms
    # I think they consist of a list of 2-item tuples, each containing some type of 'index' and a 'chunk' of the dataset or a reference to it, as can be seen in the first line of the function below
    def _name_of_preprocess(item: Indexed[T]) -> Indexed[T]:
        index, ds = item
        # do something to each ds chunk here 
        # and leave index untouched
        return index, ds

    # this expand function is a necessary part of developing your own Beam PTransforms, I think
    # it wraps the above preprocess function and applies it to the PCollection, i.e. all the 'ds' chunks in Indexed
    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        return pcoll | beam.Map(self._name_of_preprocess)

then in the pipeline:


recipe = (
        beam.Create(pattern.items())
        | OpenURLWithFSSpec()
        | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
        | NameOfPreprocess()
        | StoreToZarr(
            combine_dims=pattern.combine_dim_keys,
            target_root='.',
            store_name='out.zarr',
        )
    )
A cleaner alternative seems like it could be: ``` def preprocess(index, ds): return index, ds.expand_dims(dim="t") # or some other simple preprocess ``` and

recipe = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec(cache=None)
    | OpenWithXarray()
    | beam.MapTuple(preprocess)
        | StoreToZarr(
            combine_dims=pattern.combine_dim_keys,
            target_root='.',
            store_name='out.zarr',
    )
)
mattjbr123 commented 2 weeks ago

Have now implemented the preprocesser, seems to be working, albeit taking a long time (~1hr) to process with a single core, even when pruned to only 12 files (which I've just noticed are in the wrong order, it's doing all the januaries from the first 12 years instead of the first 12 months of the first year...)

mattjbr123 commented 2 weeks ago

Key takeaway is that some manual writing of a preprocessing function is likely going to be necessary for each dataset, so plenty of training material on this should be made available. This could realistically just be in the form of: "all you need to worry about it is the bit inside the _name_of_preprocess function (what you actually want to do in your preprocessing of the data), the rest is just copy-and-paste-able around it"

mattjbr123 commented 2 weeks ago

28/08/2024 findings

The

attributes were dropped during the processing and don't appear in the final (zarr) version of the dataset. I guess this makes sense as the dataset has now been modified and these attributes need to change but there presumably is no code in pangeo-forge-recipes yet that can do this. The meaning of 'date_created' is also unclear after modification of the 'official' version of the dataset - is it the creation date of the original official version or the now-modified one?

Otherwise the datasets do appear identical at least at an 'ncdump' style first-glance

mattjbr123 commented 2 weeks ago

29/08/2024

Getting a strange error when trying to run it in parallel, which I haven't seen before...

terminate called after throwing an instance of 'std::out_of_range'
  what():  basic_string::erase: __pos (which is 16) > this->size() (which is 15)

which is a C++ error in a basic erase function which is trying to erase index 16 of a string which doesn't exist because the string ends at index 15. It's not clear where this error is happening, the rest of the text surrounding this line seems unhelpful.

It is something to do with my python environment - a particular version of a particular package must be breaking things as the original workflow for the G2G data runs fine in my old environment, but not in the new.

To figure out which package is the culprit I will clone the old environment and gradually update the key packages in it, until it breaks, at which point I can look and see what dependencies the new package required and repeat the process with these dependencies until I have it.

Tested: