Closed benjaminleighton closed 9 months ago
@benjaminleighton, @pbranson, I really like this idea. This would allow to dial down these Source objects to a minimum and make a better use of intake. I'd only suggest some changes in how we define the fields signature in this new SourceIntake class. I haven't been able to make this work yet though, have you got a working example? For instance, if I try this using one of the rompy test files:
import yaml
import intake
ds = intake.open_netcdf("tests/data/gebco-5km.nc")
cat = intake.Catalog.from_dict(yaml.safe_load(ds.yaml()))
list(cat)
I get:
['sources']
But I can't seem to be able to load dask from it by doing for example cat.sources.to_dask()
or something similar - I get AttributeError: 'dict' object has no attribute '_catalog'
error.
@rafa-guedes Yes I find Catalog.from_dict somewhat mis-leading. You need to use the from intake.catalog.local import YAMLFileCatalog
class.
I spent some time playing around with dynamic generation of intake catalogs again and have come up with an example, which I think is the cleanest.
The part I have struggled with the most is the intake catalogs, because the schema is so simple, expects that you just define the dictionary and dump to yaml. There are two key bits of boilerplate code that make the code cleaner:
def as_entry(source):
# Helper function to dereference the dictionary that is created by https://github.com/intake/intake/blob/4bbb8c2935700c318928ffdba88c824282ac9970/intake/source/base.py#L295C23-L295C23
return list(source._yaml()["sources"].values())[0] # Not sure why the source (entries) yaml gets nested sources - this dereferences it
#Taking the dictionary to an in-memory catalog object is also a bit terse, expects a filesystem and file-like object path
def to_memory(cat_dict,tmp='temp.yaml'):
fs = fsspec.filesystem('memory')
fs_map = fs.get_mapper()
fs_map[f'/{tmp}']=yaml.dump(cat_dict).encode('utf-8')
return fs, tmp
so the example above could be changed to :
class SourceIntake(SourceBase):
"""Source dataset from intake catalog."""
model_type: Literal["intake"] = Field(
default="intake",
description="Model type discriminator",
)
dataset_id: str = Field(description="The id of the dataset to read in the catalog")
catalog_uri: Optional[str | Path] = Field(description="The URI of the catalog to read from")
catalog_yaml: Optional[str] = Field(description="The YAML string of the catalog to read from")
kwargs: dict = Field(
default={},
description="Keyword arguments to define intake dataset parameters",
)
def __str__(self) -> str:
return f"SourceIntake(catalog_uri={self.catalog_uri}, dataset_id={self.dataset_id})"
@property
def catalog(self) -> Catalog:
"""The intake catalog instance."""
if self.catalog_uri:
return intake.open_catalog(self.catalog_uri)
else:
fs = fsspec.filesystem('memory')
fs_map = fs.get_mapper()
fs_map[f'/temp.yaml']=self.catalog_yaml.encode('utf-8')
return YAMLFileCatalog('temp.yaml',fs=fs)
def _open(self) -> xr.Dataset:
return self.catalog[self.dataset_id](**self.kwargs).to_dask()
A full example is here: https://gist.github.com/pbranson/5c65962ec8120d4844d4e34e72655f2b
Worth watching this if you have time, Martin gives an overview of his thinking and prototyping for intake 2 which seems to overlap a lot with some of the challenges we are attempting to tackle with intake drivers, filters and catalogs: https://discourse.pangeo.io/t/sep-27-2023-intake-2-the-future-martin-durant/3706/3
This looks interesting @pbranson thanks for sharing the link.
Sorry @pbranson I let this slip by. Thanks for sharing your example / notebook, I tested here and it works. I just wonder though what would be the workflow for using the SourceIntake class this way - The way I managed to make it work based on your example and explanation does not look super straightforward (I may be missing something). For example, using one of the test files in rompy after making the changes to the SourceIntake
class:
source = intake.open_netcdf("/source/rompy/tests/data/era5-20230101.nc")
yaml_source = list(source._yaml()["sources"].values())[0] # Or define this in rompy as a helper function such as the as_entry() one you defined in the notebook
catalog_yaml = {"metadata": {"version": 1}, "sources": {"era5": yaml_source}}
rompy_source = SourceIntake(dataset_id="era5", catalog_yaml=catalog_yaml)
I'm happy to implement these changes to SourceIntake if these are going to be useful, also happy to review a pull request with the require changes. I think we may still want to leave other source classes such as SourceFile for example since it may be easier to use that with an existing file in some cases.
The use case here is for circumstances where a catalog is generated by an external system and is passed in as YAML. For instance from some other database that indexes forcing files. The catalog yaml can be serialised.
I agree for the single file case it's utility may not make as much sense, other than leveraging the intake layer to take the file from source to in memory container.
Also note that the inferface should be more like this (not as terse):
source = intake.open_netcdf("/source/rompy/tests/data/era5-20230101.nc")
source.name = "era5"
rompy_source = SourceIntake(dataset_id="era5", catalog_yaml=source.yaml())
Assuming you are using a SourceIntake:
class SourceIntake(SourceBase):
"""Source dataset from intake catalog."""
model_type: Literal["intake"] = Field(
default="intake",
description="Model type discriminator",
)
dataset_id: str = Field(description="The id of the dataset to read in the catalog")
catalog_uri: Optional[str | Path] = Field(description="The URI of the catalog to read from")
catalog_yaml: Optional[str] = Field(description="The YAML string of the catalog to read from")
kwargs: dict = Field(
default={},
description="Keyword arguments to define intake dataset parameters",
)
def __str__(self) -> str:
return f"SourceIntake(catalog_uri={self.catalog_uri}, dataset_id={self.dataset_id})"
@property
def catalog(self) -> Catalog:
"""The intake catalog instance."""
if self.catalog_uri:
return intake.open_catalog(self.catalog_uri)
else:
fs = fsspec.filesystem('memory')
fs_map = fs.get_mapper()
fs_map[f'/temp.yaml']=self.catalog_yaml.encode('utf-8')
return YAMLFileCatalog('temp.yaml',fs=fs)
def _open(self) -> xr.Dataset:
return self.catalog[self.dataset_id](**self.kwargs).to_dask()
That looks easier Paul. I have opened a pull request https://github.com/rom-py/rompy/pull/70 to implement this, assigned it to you.
Implemented in #70
Hi @rafa-guedes talking to @pbranson this afternoon he suggested that some of the functionality going into SourceDataset(s) and similar could go into intake catalogs combined with SourceIntake. This makes sense to me as well. It means that filtering and xarray wrappers can sit in intake and we don't have to rebuild functionality there.
For example we could replace
SourceFile(uri='bathy_temp.tif')
With something like:
here catalog_yaml is an alternative to catalog_uri that allows catalog_yaml to be embedded directly in a serialized SourceIntake object
and might require an extension of the SourceIntake object like:
What do you think?