kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
91 stars 85 forks source link

Rewrite the Polars datasets to not rely on fsspec unnecessarily #625

Open astrojuanlu opened 6 months ago

astrojuanlu commented 6 months ago

Description

Rewrite the polars datasets: https://github.com/kedro-org/kedro-plugins/tree/main/kedro-datasets/kedro_datasets/polars to not rely on fsspec, because they don't need it.

Context

Polars can read remote systems just fine thanks to https://docs.rs/object_store/latest/object_store/, but the kedro-datasets version asks me for fsspec dependencies anyway:

In [13]: ds = EagerPolarsDataset(filepath="s3://reddit-submissions/submissions-raw", file_format="delta")
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/fsspec/registry.py:238, in get_filesystem_class(protocol)
    237 try:
--> 238     register_implementation(protocol, _import_class(bit["class"]))
    239 except ImportError as e:

File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/fsspec/registry.py:273, in _import_class(cls, minv)
    272 s3 = mod == "s3fs"
--> 273 mod = importlib.import_module(mod)
    274 if s3 and mod.__version__.split(".") < ["0", "5"]:

File /opt/homebrew/Cellar/python@3.11/3.11.5/Frameworks/Python.framework/Versions/3.11/lib/python3.11/importlib/__init__.py:126, in import_module(name, package)
    125         level += 1
--> 126 return _bootstrap._gcd_import(name[level:], package, level)

File <frozen importlib._bootstrap>:1204, in _gcd_import(name, package, level)

File <frozen importlib._bootstrap>:1176, in _find_and_load(name, import_)

File <frozen importlib._bootstrap>:1140, in _find_and_load_unlocked(name, import_)

ModuleNotFoundError: No module named 's3fs'

Related: #590

Your Environment

Include as many relevant details about the environment in which you experienced the bug:

noklam commented 6 months ago

Interesting to learn about why Rust choose object store instead of a general filesystem interface. One things that come into mind is how to make the Kedro Versioning works for this as we almost use fs.glob everywhere. We may need something new for handling object store.

To be fair local + object store is the majority case. We do handle some niche fs like hdfs (was common 10 years ago, almost non-exist now), sftp etc.

astrojuanlu commented 6 months ago

The good thing is that, if you drop fsspec by leveraging the underlying mechanism of the target library, writing a new custom dataset is trivial

https://github.com/astrojuanlu/workshop-kedro-huggingface/blob/7666a33/delta_polars_dataset.py

astrojuanlu commented 5 months ago

Today I showed this fsspec-free dataset to a user and they were happy to see how easy it is to write:

import typing as t

import polars as pl
from kedro.io import AbstractDataset

class DeltaPolarsDataset(AbstractDataset[pl.DataFrame, pl.DataFrame]):
    """``DeltaDataset`` loads/saves data from/to a Delta Table using an underlying
    filesystem (e.g.: local, S3, GCS). It returns a Polars dataframe.
    """

    DEFAULT_LOAD_ARGS: dict[str, t.Any] = {}
    DEFAULT_SAVE_ARGS: dict[str, t.Any] = {}

    def __init__(
        self,
        filepath: str,
        load_args: dict[str, t.Any] | None = None,
        save_args: dict[str, t.Any] | None = None,
        credentials: dict[str, t.Any] | None = None,
        storage_options: dict[str, t.Any] | None = None,
        metadata: dict[str, t.Any] | None = None,
    ):
        self._filepath = filepath
        self._load_args = {**self.DEFAULT_LOAD_ARGS, **(load_args or {})}
        self._save_args = {**self.DEFAULT_SAVE_ARGS, **(save_args or {})}
        self._credentials = credentials or {}
        self._storage_options = storage_options or {}

        self._storage_options.update(self._credentials)
        self._metadata = metadata or {}

    def _load(self) -> pl.DataFrame:
        return pl.read_delta(
            self._filepath, storage_options=self._storage_options, **self._load_args
        )

    def _save(self, data: pl.DataFrame) -> None:
        data.write_delta(
            self._filepath, storage_options=self._storage_options, **self._save_args
        )

    def _describe(self) -> dict[str, t.Any]:
        return dict(
            filepath=self._filepath,
            load_args=self._load_args,
            save_args=self._save_args,
            storage_options=self._storage_options,
            metadata=self._metadata,
        )
astrojuanlu commented 5 months ago

It was noted today in backlog grooming by @noklam that fsspec is linked to our versioning, therefore ditching it might be more complicated than expected and not worth the effort.

For the particular case of Delta format, let's

  1. Look at https://github.com/kedro-org/kedro-plugins/issues/444, and
  2. If the above cannot be easily fixed, we can contribute a custom DeltaPolarsDataset as I wrote in https://github.com/kedro-org/kedro-plugins/issues/625#issuecomment-2042634179
noklam commented 5 months ago

From backlog grooming:

MatthiasRoels commented 3 months ago

Interesting to learn about why Rust choose object store instead of a general filesystem interface.

@noklam: It is explained in the docs.

MatthiasRoels commented 3 months ago

I did some experiments to see where we are in using plain vanilla read_*, scan_* and write_* operations on object stores (for a local filesystem, they work as expected).

So if we make any change, we could split up the logic based on the file_format; if parquet, use native methods (unless you want to read data with Hive Partitioning) and use the current implementation otherwise.

Any thoughts?

astrojuanlu commented 3 months ago

So if we make any change, we could split up the logic based on the file_format; if parquet, use native methods (unless you want to read data with Hive Partitioning) and use the current implementation otherwise.

Any thoughts?

At this point we should start by documenting what the "social contract" of Kedro datasets is. In other words, how war we go in filling gaps like the ones you describe.

Somewhat related: https://github.com/kedro-org/kedro/issues/1936