Closed lucasjamar closed 8 months ago
@lucasjamar to my knowledge this hasn't been requested before - we'd absolutely appreciate a PR :)
Always interesting to hear about communities I'm not familiar with and how they might use kedro 🙂 🎉 👍 FYI @noklam - am I right in thinking you're from geoscience originally?
@AntonyMilneQB Close enough! I did Earth System Science.
@lucasjamar Hi Lucas, just checking in to see if you need any help with this.
Hi @noklam,
Quite busy right now sorry. Im hoping to have a look at this over easter
@lucasjamar No worries! Looking forward to see your PR😀.
@lucasjamar Are you still interested in this?
@noklam I started during the holiday break but didn't get very far. Im afraid i wont find the time to get any further with this... Terribly sorry
Sorry this is as far as i could get:
"""``GenericDataSet`` loads/saves data from/to a NetCDF file using an underlying
filesystem (e.g.: local, S3, GCS). It uses xarray to handle the NetCDF file.
"""
import logging
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from typing import Any, Dict
import fsspec
import xarray as xr
from kedro.io.core import (
PROTOCOL_DELIMITER,
AbstractVersionedDataSet,
DataSetError,
Version,
get_filepath_str,
get_protocol_and_path,
)
logger = logging.getLogger(__name__)
class GenericDataSet(AbstractVersionedDataSet):
"""``GenericDataSet`` loads/saves data from/to a file using an underlying
filesystem (e.g.: local, S3, GCS). It uses xarray to handle the file.
Example adding a catalog entry with
`YAML API
<https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
>>> january:
>>> type: xarray.GenericDataSet
>>> filepath: data/01_raw/weather/january.nc
>>> load_args:
>>> engine: netcdf4
>>> decode_times: True
>>> save_args:
>>> index: False
>>> date_format: "%Y-%m-%d %H:%M"
>>>
>>> motorbikes:
>>> type: xarray.GenericDataSet
>>> filepath: gcs://your_bucket/weater.ya
>>> credentials: dev_s3
>>>
Example using Python API:
::
>>> from kedro.extras.datasets.xarray import GenericDataSet
>>> import numpy as np
>>> import pandas as pd
>>> import xarray as xr
>>>
>>> data = xr.Dataset(
>>> {"foo": (("x", "y"), np.random.rand(4, 5))},
>>> coords={
>>> "x": [10, 20, 30, 40],
>>> "y": pd.date_range("2000-01-01", periods=5),
>>> "z": ("x", list("abcd")),
>>> },
>>> )
>>>
>>> # data_set = GenericDataSet(filepath="gcs://bucket/test.nc")
>>> data_set = GenericDataSet(filepath="test.nc")
>>> data_set.save(data)
>>> reloaded = data_set.load()
>>> assert data.equals(reloaded)
"""
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {"index": False} # type: Dict[str, Any]
# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``GenericDataSet`` pointing to a concrete NetCDF file
on a specific filesystem.
Args:
filepath: Filepath in POSIX format to a NetCDF file prefixed with a protocol like `s3://`.
If prefix is not provided, `file` protocol (local filesystem) will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: Pandas options for loading NetCDF files.
Here you can find all available arguments:
https://xarray.pydata.org/xarray-docs/stable/generated/xarray.read_nc.html
All defaults are preserved.
save_args: Pandas options for saving NetCDF files.
Here you can find all available arguments:
https://xarray.pydata.org/xarray-docs/stable/generated/xarray.DataFrame.to_nc.html
All defaults are preserved, but "index", which is set to False.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}
protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)
self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)
super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)
# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
if "storage_options" in self._save_args or "storage_options" in self._load_args:
logger.warning(
"Dropping `storage_options` for %s, "
"please specify them under `fs_args` or `credentials`.",
self._filepath,
)
self._save_args.pop("storage_options", None)
self._load_args.pop("storage_options", None)
def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
)
def _load(self) -> xr.Dataset:
load_path = str(self._get_load_path())
if self._protocol == "file":
# file:// protocol seems to misbehave on Windows
# (<urlopen error file not on local host>),
# so we don't join that back to the filepath;
# storage_options also don't work with local paths
return xr.open_dataset(load_path, **self._load_args)
load_path = f"{self._protocol}{PROTOCOL_DELIMITER}{load_path}"
return xr.open_dataset(
load_path, storage_options=self._storage_options, **self._load_args
)
def _save(self, data: xr.DataArray) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
buf = BytesIO()
data.to_nc(path_or_buf=buf, **self._save_args)
with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(buf.getvalue())
self._invalidate_cache()
def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False
return self._fs.exists(load_path)
def _release(self) -> None:
super()._release()
self._invalidate_cache()
def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)
@lucasjamar Hey thanks for making time for this! Do you want to make this a PR? It would be easier for me to drop comments on it. :)
Xarray functionality in the Data Catalog would be a big deal for me. Is there an update on this?
No, unfortunately not. Feel free to try building on @lucasjamar's start above and raise a PR for it though 🙂
At a glance making just NetCDF works is straight forward, but seems @lucasjamar is going with a GenericXarrayDataSet. I wonder if it makes sense to take the pandas
approach with xarray.NetCDFDataSet
, xarray.PickleDataSet
etc.
Here is a quick implementation, mostly just copy from above
class NetCDFDataSet(AbstractVersionedDataSet):
"""``GenericDataSet`` loads/saves data from/to a file using an underlying
filesystem (e.g.: local, S3, GCS). It uses xarray to handle the file.
Example adding a catalog entry with
`YAML API
<https://kedro.readthedocs.io/en/stable/data/\
data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
>>> january:
>>> type: xarray.GenericDataSet
>>> filepath: data/01_raw/weather/january.nc
>>> load_args:
>>> engine: netcdf4
>>> decode_times: True
>>> save_args:
>>> index: False
>>> date_format: "%Y-%m-%d %H:%M"
>>>
>>> motorbikes:
>>> type: xarray.GenericDataSet
>>> filepath: gcs://your_bucket/weater.ya
>>> credentials: dev_s3
>>>
Example using Python API:
::
>>> from kedro.extras.datasets.xarray import GenericDataSet
>>> import numpy as np
>>> import pandas as pd
>>> import xarray as xr
>>>
>>> data = xr.Dataset(
>>> {"foo": (("x", "y"), np.random.rand(4, 5))},
>>> coords={
>>> "x": [10, 20, 30, 40],
>>> "y": pd.date_range("2000-01-01", periods=5),
>>> "z": ("x", list("abcd")),
>>> },
>>> )
>>>
>>> # data_set = GenericDataSet(filepath="gcs://bucket/test.nc")
>>> data_set = GenericDataSet(filepath="test.nc")
>>> data_set.save(data)
>>> reloaded = data_set.load()
>>> assert data.equals(reloaded)
"""
DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {} # type: Dict[str, Any]
# pylint: disable=too-many-arguments
def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``GenericDataSet`` pointing to a concrete NetCDF file
on a specific filesystem.
Args:
filepath: Filepath in POSIX format to a NetCDF file prefixed with a protocol like `s3://`.
If prefix is not provided, `file` protocol (local filesystem) will be used.
The prefix should be any protocol supported by ``fsspec``.
Note: `http(s)` doesn't support versioning.
load_args: Pandas options for loading NetCDF files.
Here you can find all available arguments:
https://xarray.pydata.org/xarray-docs/stable/generated/xarray.read_nc.html
All defaults are preserved.
save_args: Pandas options for saving NetCDF files.
Here you can find all available arguments:
https://xarray.pydata.org/xarray-docs/stable/generated/xarray.DataFrame.to_nc.html
All defaults are preserved, but "index", which is set to False.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``).
"""
_fs_args = deepcopy(fs_args) or {}
_fs_open_args_load = _fs_args.pop("open_args_load", {})
_fs_open_args_save = _fs_args.pop("open_args_save", {})
_credentials = deepcopy(credentials) or {}
protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)
self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)
super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)
# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
if "storage_options" in self._save_args or "storage_options" in self._load_args:
logger.warning(
"Dropping `storage_options` for %s, "
"please specify them under `fs_args` or `credentials`.",
self._filepath,
)
self._save_args.pop("storage_options", None)
self._load_args.pop("storage_options", None)
_fs_open_args_save.setdefault("mode", "wb")
self._fs_open_args_load = _fs_open_args_load
self._fs_open_args_save = _fs_open_args_save
def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
version=self._version,
)
def _load(self) -> xr.Dataset:
load_path = str(self._get_load_path())
if self._protocol == "file":
# file:// protocol seems to misbehave on Windows
# (<urlopen error file not on local host>),
# so we don't join that back to the filepath;
# storage_options also don't work with local paths
return xr.open_dataset(load_path, **self._load_args)
load_path = f"{self._protocol}{PROTOCOL_DELIMITER}{load_path}"
return xr.open_dataset(
load_path, storage_options=self._storage_options, **self._load_args
)
def _save(self, data: xr.DataArray) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
data.to_netcdf(fs_file, **self._save_args)
self._invalidate_cache()
def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False
return self._fs.exists(load_path)
def _release(self) -> None:
super()._release()
self._invalidate_cache()
def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)
Would it make sense to have a file_format
argument and then get the right xr.to/from_***
function based on that? I'm not sure how standardised the xarray load/save API is. This is what we do in pandas.GenericDataSet
:
https://github.com/kedro-org/kedro/blob/fdc6be73408852b09fcc16370706257b918994d5/kedro/extras/datasets/pandas/generic_dataset.py#L187-L188
I am actually not aware of the pandas.GenericDataSet😅 do people actually use that and do we keep both for backward compatiblitity reason? I think it is possible since the API is quite similar.
Btw I saw the invalidate_cache method for some datasets are public but the majority are private, does that make sense if we just remove that and put it in the abstractClass? We have repeated that 71 times😅
Yeah, pandas.GenericDataSet
isn't well known. It was added relatively recently so that we don't have to constantly be catching up with pandas when they add a new format: https://github.com/kedro-org/kedro/pull/987. It's not really designed to replace the pandas.CSVDataSet
etc. - just to cater for any new formats where we don't have a dedicated dataset. But at some point it might make sense to deprecate the dedicated ones in favour of the generic.
Btw I saw the invalidate_cache method for some datasets are public but the majority are private, does that make sense if we just remove that and put it in the abstractClass? We have repeated that 71 times😅
Probably yes. The same could probably be said for _exists
, _release
, the repetition of save_path = get_filepath_str(self._filepath, self._protocol)
, etc. etc. It would be nice if we could reduce the repeated boiler plate that's in every dataset definition, and I don't think it would be too hard to do. Possibly there's some reason for keeping it, so let's not remove it straight away without getting some more input. I would consider it part of my "fundamental issue" in https://github.com/kedro-org/kedro/issues/1691 - we should consider generally what exactly belongs in the abstract classes vs. the implementations.
Not sure if I like the idea of deprecating the dedicated one, but this is not urgent so I will think more about it. I added the dataset
related refactoring piece into kedro-org/kedro#1691 in your comments, it's a bit easier to keep track of when we have a dedicated issue for that.
Yeah the main reason why I pushed for pandas.GenericDataSet
is because they introduced pd.read_xml
and it felt like we were just always in reactive mode and could at least provide a generic way of users leveraging pandas features without us needing to push a release.
The pandas API is also really in consistently, for file paths I had to rely on the assumption that the first positional argument is going to be the path argument since the library uses a different kwarg
because each reader/writer was contributed independently.
In the Spark world we're able to only provide a generic version of the dataset since it's much better designed, I think in situations like xarray
and dask
we can probably do the same.
I really appreciate the responses. :pray:
Re: the "pandas approach" mentioned by @noklam, I think there are certainly a few xarray-supported file types that could benefit from built-in handling in kedro. NetCDF, Zarr, and GeoTIFF (via rioxarray) specifically. Maybe that supports moving away from a generic dataset, but I also appreciate the difficulty in supporting everything.
In the selfish short term it looks like I can use the implementation provided by @noklam above in combination with these instructions to get done what I need to get done. I can test out these implementations and report back changes I see as necessary.
Definitely that sounds like a good plan in the short term - you can easily use the custom dataset implementation without it being added to core kedro. Just copy and paste the above into a file and modify the dataset type
to point to that class. This would be a great way to test it out and see how well it all works.
An update and a couple questions about partitioned datasets and lazy evaluation. The implementation discussed above works well. I've only tested it on local file systems.
TL;DR: To not lose advantages of working with xarray we need to concatenate many .nc files into a big dataset and then operate on it as lazily as possible. I want to keep benefits of both xarray and Kedro.
create_partitions
approach I'm employing here best/as-intended by Kedro? I'm using a dictionary of {ids: xarray.DataSets}
.More on this:
When obtaining NetCDF files from a remote source, which are being saved for the first time in a pipeline (e.g., saving to /data/01_raw
), it is common (outside of Kedro) to do this with a for loop, saving every iteration. With Kedro handling file I/O in a partitioned dataset, is it best to make a function that builds a dictionary of unique partition ID keys with corresponding xarray.DataSet values?
This approach (example shown below) works, but I'm not sure if it is loading all this data into memory and ruining the lazy evaluation aspect. The only way I know to test this is to htop
while increasing the number of datasets I'm downloading to exceed the memory of my machine. Hoping to get some feedback and learn before I do that.
In this pipeline I would have a node defined in pipeline.py
as:
node(
func=create_data_partitions,
inputs=["params:starttime", "params:endtime"],
outputs="downloaded_data", # defined in catalog.yml
name="download_forecast_node",
)
With the corresponding function definition (which as shown here is a very simplified/broken example to highlight dictionary creation aspect) in nodes.py
as:
def create_data_partitions(starttime: Dict, endtime: Dict) -> Dict[str, Any]:
"""Create new partitions and save using PartitionedDataSet.
returns: Dictionary of the partitions to create.
"""
duration = endtime - starttime
part_lkup = {}
for i in range(duration):
tmptime = starttime + timedelta(hours=i)
ds = get_data(tmptime) # downloads a single xarray dataset
part_id = str(tmptime)+'_downloaded'
part_lkup[part_id] = ds
return part_lkup
I've used this pattern in multiple places, and it works, but again it's not clear to me that it is the best way to go.
Another example, when I start a processing pipeline on this downloaded data, I'll open the downloaded data files but concatenate them into a single, large (but not loaded into memory) xarray.DataSet. In doing so (as shown below) I am attempting to do what xr.open_mfdataset will do when given a directory full of .nc
files.
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=create_data_partitions,
inputs=["params:starttime", "params:endtime"],
outputs="downloaded_data", # defined in catalog.yml
name="download_forecast_node",
),
node(
func=concat_partitions,
inputs="downloaded_data",
outputs="concat_ds", # not in catalog, in memory, hopefully still lazy and not actually loaded
name="concat_download_ds_node",
),
node(
func=cleanup_data,
inputs="concat_ds",
outputs="clean_ds", # not in catalog, in memory, hopefully still lazy and not actually loaded
name="clean_ds_node",
),
node(
func=create_partitions,
inputs="clean_ds",
outputs="forecast_clean",
name="forecast_clean_node", # defined in catalog.yml
),
]
)
Where the concat_partitions
node uses a pattern similar to those in the Kedro docs:
def concat_partitions(partitioned_input: Dict[str, Callable[[], Any]], concat_dim: str = 'time') -> xr.Dataset:
"""Concatenate input partitions into one xarray Dataset.
param: partitioned_input: A dictionary with partition ids as keys and load functions as values.
param: concat_dim: existing dimension in each netcdf file along which to concatenate all datasets.
return: xarray Dataset containing all loaded partitions.
"""
return xr.concat([partitioned_input[i]() for i in sorted(partitioned_input)], dim=concat_dim)
and where the create_partitions
node uses my same uncertain/questionable pattern at the heart of this question:
def create_partitions(ds:xr.Dataset) -> Dict[str, Any]:
"""Create new partitions and save using PartitionedDataSet.
returns: Dictionary of the partitions to create.
"""
times = ds.time.values
part_lkup = {}
for tmptime in times:
tmp_ds = ds.sel(time=slice(tmptime, tmptime))
part_id = str(tmptime)[:13]+'_cleaned_data'
part_lkup[part_id] = tmp_ds
return part_lkup
@jamespolly thank you for your very detailed write up - @deepyaman do you want to chime in with your thoughts?
One issue I've run into with xarray.load_dataset()
and ds.to_netcdf()
functions is that they still don't allow reading/writing from bytes, only from local files. Would be nice if I could just use Kedro to abstract this away. 😄
Talking to some folks working with satellite data, would be cool to have this 🛰️
Looking to open a PR on this in the next week or so.
@riley-brady that's awesome!
Looking to open a PR on this in the next week or so.
As a PyMC user I am excited by this news!
Glad to see the excitement! I'm bundling it with a Zarr implementation, since those are commonly used together. We've got a nicely working implementation on our climate team for both, targeting an AWS platform. Let's start with stripping it down to a local-working implementation and then see how we can build it up from there.
Zarr should work out of the box for remote read/write. NetCDF can't be read remotely natively (e.g. from an S3 bucket), so we pull it down to temp storage and then do the read. One could also do something like a kerchunk
wrapper to generalize remote reads for NetCDF.
I talked to @tgoelles today and he is interested to contribute a dataset for geotiff and NetCDF. I am excited to see more contributions from the scientific community🔥
@noklam I implemented geotiff for now but the build on github actions fails for windows du to the dependency on GDAL
fork is here: https://github.com/tgoelles/kedro-plugins
Collecting rasterio (from rioxarray>=0.9.0->kedro-datasets==1.7.0) Downloading rasterio-1.2.10.tar.gz (2.3 MB) ---------------------------------------- 2.3/2.3 MB 36.2 MB/s eta 0:00:00 Installing build dependencies: started Installing build dependencies: finished with status 'done' Getting requirements to build wheel: started Getting requirements to build wheel: finished with status 'error' error: subprocess-exited-with-error
Getting requirements to build wheel did not run successfully. exit code: 1
[2 lines of output] INFO:root:Building on Windows requires extra options to setup.py to locate needed GDAL files. More information is available in the README. ERROR: A GDAL API version must be specified. Provide a path to gdal-config using a GDAL_CONFIG environment variable or use a GDAL_VERSION environment variable. [end of output]
Is there a way to support geotiff only for Ubuntu for now? I don't want to look into windows issues, and I know that GDAL makes installations complex
I can help with GDAL on Windows 👍🏽 go ahead and open the PR!
I'm also really looking forward to this PR!
Hi folks! I put a decent stake in the ground here: https://github.com/kedro-org/kedro-plugins/pull/360/files. There's a few TODO's to deal with at the moment.
I have some pressing work deadlines I need to focus on so I might not get back to this for a couple of weeks. But wanted to provide the initial snippet for folks to test out/provide feedback on instead of going silent here.
I'd like to get this implemented with file syncing for load-from-remote, since it's most straight-forward. A future PR could work with kerchunk to allow direct loading from remote storage. This is a really nice toolkit, but requires management of a lot of JSON metadata files that are generated, and which sometimes can be quite slow to generate. It will take a little bit of tweaking to implement this nicely, since the first run would need to generate and cache/store all of the reference JSONs to make future loads much much faster.
Hi everyone! The PR is fully implemented with testing and ready for review. https://github.com/kedro-org/kedro-plugins/pull/360
Of interest for people subscribed to this issue: https://guide.cloudnativegeo.org (source)
I think this can be closed via https://github.com/kedro-org/kedro-plugins/pull/360!
Closed in #360 indeed! 🚀
Description
Read and write netcdf file into Xarray. https://xarray.pydata.org/en/stable/user-guide/io.html#netcdf
Context
Should attract the weather data science community to use kedro :)
Possible Implementation
Should be quite similar to pandas.CSVDataSet. I'll give this implementation a shot in my free time.
Has anyone ever implemented such a custom dataset already?