kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
84 stars 76 forks source link

Cannot use `file_format: delta` with `polars.EagerPolarsDataset` #444

Open astrojuanlu opened 7 months ago

astrojuanlu commented 7 months ago

Description

As per title.

I think the reason is that, annoyingly, not all pl.DataFrame.write_* methods are equivalent: some can take a buffer, but some others can't.

Compare these two:

Steps to Reproduce

statuses_table:
  type: polars.EagerPolarsDataset
  file_format: delta
  filepath: s3://social-summarizer/statuses
  save_args:
    storage_options:
      AWS_ENDPOINT_URL: "http://127.0.0.1:9010"
      AWS_ACCESS_KEY_ID: "minioadmin"
      AWS_SECRET_ACCESS_KEY": "minioadmin"
      AWS_REGION: "<localhost>"
      AWS_ALLOW_HTTP: "true"
      AWS_S3_ALLOW_UNSAFE_RENAME: "true"

Expected Result

The dataset gets written.

Actual Result

File ~/.micromamba/envs/kedrohf311-talk/lib/python3.11/site-packages/deltalake/writer.py:441, in try_get_table_and_table_uri(table_or_uri, storage_options)
    440 if not isinstance(table_or_uri, (str, Path, DeltaTable)):
--> 441     raise ValueError("table_or_uri must be a str, Path or DeltaTable")
    443 if isinstance(table_or_uri, (str, Path)):

ValueError: table_or_uri must be a str, Path or DeltaTable

Your Environment

Include as many relevant details about the environment in which you experienced the bug:

astrojuanlu commented 7 months ago

xref https://linen-slack.kedro.org/t/16079920/question-to-the-kedro-hive-mind-i-want-to-define-a-delta-dat#cf914731-f49f-4ad4-9ebd-b21af640c300

astrojuanlu commented 3 months ago

I'm here again. Fails locally too, because it assumes the target has to be a file, but for Delta is a directory:

In [4]: ds = EagerPolarsDataset(filepath="./submissions-raw", file_format="delta")

In [5]: df = ds.load()
---------------------------------------------------------------------------
IsADirectoryError                         Traceback (most recent call last)
File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/kedro/io/core.py:193, in AbstractDataset.load(self)
    192 try:
--> 193     return self._load()
    194 except DatasetError:

File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/kedro_datasets/polars/eager_polars_dataset.py:156, in EagerPolarsDataset._load(self)
    149     raise DatasetError(
    150         f"Unable to retrieve 'polars.read_{self._file_format}' method, please"
    151         " ensure that your "
   (...)
    154         " https://pola-rs.github.io/polars/py-polars/html/reference/io.html"
    155     )
--> 156 with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
    157     return load_method(fs_file, **self._load_args)

File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/fsspec/spec.py:1293, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
   1294     path,
   1295     mode=mode,
   1296     block_size=block_size,
   1297     autocommit=ac,
   1298     cache_options=cache_options,
   1299     **kwargs,
   1300 )
   1301 if compression is not None:

File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/fsspec/implementations/local.py:197, in LocalFileSystem._open(self, path, mode, block_size, **kwargs)
    196     self.makedirs(self._parent(path), exist_ok=True)
--> 197 return LocalFileOpener(path, mode, fs=self, **kwargs)

File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/fsspec/implementations/local.py:322, in LocalFileOpener.__init__(self, path, mode, autocommit, fs, compression, **kwargs)
    321 self.blocksize = io.DEFAULT_BUFFER_SIZE
--> 322 self._open()

File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/fsspec/implementations/local.py:327, in LocalFileOpener._open(self)
    326 if self.autocommit or "w" not in self.mode:
--> 327     self.f = open(self.path, mode=self.mode)
    328     if self.compression:

IsADirectoryError: [Errno 21] Is a directory: '/Users/juan_cano/Projects/QuantumBlackLabs/workshop-kedro-huggingface/submissions-raw'

The above exception was the direct cause of the following exception:

DatasetError                              Traceback (most recent call last)
Cell In[5], line 1
----> 1 df = ds.load()

File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/kedro/io/core.py:615, in AbstractVersionedDataset.load(self)
    614 def load(self) -> _DO:
--> 615     return super().load()

File ~/Projects/QuantumBlackLabs/workshop-kedro-huggingface/.venv/lib/python3.11/site-packages/kedro/io/core.py:202, in AbstractDataset.load(self)
    196 except Exception as exc:
    197     # This exception handling is by design as the composed data sets
    198     # can throw any type of exception.
    199     message = (
    200         f"Failed while loading data from data set {str(self)}.\n{str(exc)}"
    201     )
--> 202     raise DatasetError(message) from exc

DatasetError: Failed while loading data from data set EagerPolarsDataset(file_format=delta, filepath=submissions-raw, load_args={}, protocol=file, save_args={}).
[Errno 21] Is a directory: '/Users/juan_cano/Projects/QuantumBlackLabs/workshop-kedro-huggingface/submissions-raw'