kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
84 stars 76 forks source link

polars.EagerPolarsDataset fails to read parquet #590

Open mark-druffel opened 4 months ago

mark-druffel commented 4 months ago

Description

Trying to load a parquet w/ polars.EagerPolarsDataset with data catalog results in an error saying no such file or directory :

model_input_table@eagerPolars:
  type: polars.EagerPolarsDataset
  filepath: data/03_primary/model_input_table.parquet/*.parquet
  file_format: parquet

Context

I'm working on a new pipeline, bug is brand new to me. I'm trying to load parquets into my pipeline using polars. I realized lazy polars seems to work fine so this isn't preventing me from doing anything at the moment.

Steps to Reproduce

I reproduced the issue with the kedro starter pipeline in a conda environment.

  1. Create project by running kedro_new --name polars_issue --tools all --example y
  2. kedro run to populate data/.
  3. Edit requirements.txt to include polars and run pip install requirements.txt:
    # requirements.txt
    ipython>=8.10
    jupyterlab>=3.0
    kedro~=0.19.3
    kedro-datasets[pandas.CSVDataset, pandas.ExcelDataset, pandas.ParquetDataset, polars.EagerPolarsDataset, polars.LazyPolarsDataset, spark.SparkDataset, plotly.PlotlyDataset, plotly.JSONDataset, matplotlib.MatplotlibWriter]>=1.0
    kedro-telemetry>=0.3.1
    kedro-viz>=6.7.0
    notebook
    pytest~=7.2
    pytest-cov~=3.0
    pytest-mock>=1.7.1, <2.0
    ruff~=0.1.8
    scikit-learn~=1.0
    seaborn~=0.12.1
  4. Append two entries to conf/base/catalog.yml:
    
    # catalog.yml
    model_input_table@eagerPolars:
    type: polars.EagerPolarsDataset
    filepath: data/03_primary/model_input_table.parquet/*.parquet
    file_format: parquet

model_input_table@lazyPolars: type: polars.LazyPolarsDataset filepath: data/03_primary/model_input_table.parquet/*.parquet file_format: parquet

5. Run the below code in a kedro session (jupyter or ipython):

import polars as pl df_with_polars = pl.read_parquet(source = "data/03_primary/model_input_table.parquet/*.parquet") print(df_with_polars)

df_lazy = catalog.load("model_input_table@lazyPolars") print(df_lazy) print(df_lazy.collect())

df_eager = catalog.load("model_input_table@eagerPolars") print(df_eager)


## Expected Result
The code below should work the same as `pl.read_parquet() and LazyPolarsDataset did in *steps to reproduce*. 

df_eager = catalog.load("model_input_table@eagerPolars") print(df_eager)

## Actual Result

02/29/24 21:21:22] INFO Loading data from model_input_table@eagerPolars data_catalog.py:483 (EagerPolarsDataset)...
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/core.py:193 in │ │ load │ │ │ │ 190 │ │ self._logger.debug("Loading %s", str(self)) │ │ 191 │ │ │ │ 192 │ │ try: │ │ ❱ 193 │ │ │ return self._load() │ │ 194 │ │ except DatasetError: │ │ 195 │ │ │ raise │ │ 196 │ │ except Exception as exc: │ │ │ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro_datasets/polars/eag │ │ er_polars_dataset.py:156 in _load │ │ │ │ 153 │ │ │ │ " API" │ │ 154 │ │ │ │ " https://pola-rs.github.io/polars/py-polars/html/reference/io.html" │ │ 155 │ │ │ ) │ │ ❱ 156 │ │ with self._fs.open(load_path, self._fs_open_args_load) as fs_file: │ │ 157 │ │ │ return load_method(fs_file, self._load_args) │ │ 158 │ │ │ 159 │ def _save(self, data: pl.DataFrame) -> None: │ │ │ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/fsspec/spec.py:1295 in │ │ open │ │ │ │ 1292 │ │ │ ) │ │ 1293 │ │ else: │ │ 1294 │ │ │ ac = kwargs.pop("autocommit", not self._intrans) │ │ ❱ 1295 │ │ │ f = self._open( │ │ 1296 │ │ │ │ path, │ │ 1297 │ │ │ │ mode=mode, │ │ 1298 │ │ │ │ block_size=block_size, │ │ │ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/fsspec/implementations/lo │ │ cal.py:180 in _open │ │ │ │ 177 │ │ path = self._strip_protocol(path) │ │ 178 │ │ if self.auto_mkdir and "w" in mode: │ │ 179 │ │ │ self.makedirs(self._parent(path), exist_ok=True) │ │ ❱ 180 │ │ return LocalFileOpener(path, mode, fs=self, kwargs) │ │ 181 │ │ │ 182 │ def touch(self, path, truncate=True, kwargs): │ │ 183 │ │ path = self._strip_protocol(path) │ │ │ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/fsspec/implementations/lo │ │ cal.py:302 in init │ │ │ │ 299 │ │ self.autocommit = autocommit │ │ 300 │ │ self.compression = get_compression(path, compression) │ │ 301 │ │ self.blocksize = io.DEFAULT_BUFFER_SIZE │ │ ❱ 302 │ │ self._open() │ │ 303 │ │ │ 304 │ def _open(self): │ │ 305 │ │ if self.f is None or self.f.closed: │ │ │ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/fsspec/implementations/lo │ │ cal.py:307 in _open │ │ │ │ 304 │ def _open(self): │ │ 305 │ │ if self.f is None or self.f.closed: │ │ 306 │ │ │ if self.autocommit or "w" not in self.mode: │ │ ❱ 307 │ │ │ │ self.f = open(self.path, mode=self.mode) │ │ 308 │ │ │ │ if self.compression: │ │ 309 │ │ │ │ │ compress = compr[self.compression] │ │ 310 │ │ │ │ │ self.f = compress(self.f, mode=self.mode) │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ FileNotFoundError: [Errno 2] No such file or directory: '~/polars-bug/data/03_primary/model_input_table.parquet/*.parquet'

The above exception was the direct cause of the following exception:

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮ │ in :1 │ │ │ │ ❱ 1 df_eager = catalog.load("model_input_table@eagerPolars") │ │ 2 print(df_eager) │ │ 3 │ │ │ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/data_catalog.py: │ │ 490 in load │ │ │ │ 487 │ │ │ extra={"markup": True}, │ │ 488 │ │ ) │ │ 489 │ │ │ │ ❱ 490 │ │ result = dataset.load() │ │ 491 │ │ │ │ 492 │ │ return result │ │ 493 │ │ │ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/core.py:615 in │ │ load │ │ │ │ 612 │ │ return self._filepath / version / self._filepath.name │ │ 613 │ │ │ 614 │ def load(self) -> _DO: │ │ ❱ 615 │ │ return super().load() │ │ 616 │ │ │ 617 │ def save(self, data: _DI) -> None: │ │ 618 │ │ self._version_cache.clear() │ │ │ │ ~/miniconda3/envs/polars_bug/lib/python3.10/site-packages/kedro/io/core.py:202 in │ │ load │ │ │ │ 199 │ │ │ message = ( │ │ 200 │ │ │ │ f"Failed while loading data from data set {str(self)}.\n{str(exc)}" │ │ 201 │ │ │ ) │ │ ❱ 202 │ │ │ raise DatasetError(message) from exc │ │ 203 │ │ │ 204 │ def save(self, data: _DI) -> None: │ │ 205 │ │ """Saves data by delegation to the provided save method. │ ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ DatasetError: Failed while loading data from data set EagerPolarsDataset(file_format=parquet, filepath=~/polars-bug/data/03_primary/model_input_table.parquet/.parquet, load_args={}, protocol=file, save_args={}). [Errno 2] No such file or directory: '~/polars-bug/data/03_primary/model_input_table.parquet/.parquet'



## Your Environment
Include as many relevant details about the environment in which you experienced the bug:

* Kedro version used (`pip show kedro` or `kedro -V`): kedro, version 0.19.3
* Kedro plugin and kedro plugin version used (`pip show kedro-airflow`): kedro-datasets 2.1.0
* Python version used (`python -V`): Python 3.10.13
* Operating system and version: Mac Sonoma 14.3.1
grofte commented 3 months ago

It's because Kedro opens the file and somehow doesn't recognize it as a bytes / io.BufferedIOBase / io.RawIOBase (or Polars doesn't) and therefore sends it to .scan_parquet which only takes paths as an argument.

If you do this is catalog.yml it will work:

load_args:
    use_pyarrow: true

Kedro sends it off to Polars here: https://github.com/kedro-org/kedro-plugins/blob/main/kedro-datasets/kedro_datasets/polars/eager_polars_dataset.py#L156

And here Polars checks if the file is already opened: https://github.com/pola-rs/polars/blob/py-0.20.13/py-polars/polars/io/parquet/functions.py#L156

Then Polars decides that it isn't opened and goes here: https://github.com/pola-rs/polars/blob/py-0.20.13/py-polars/polars/io/parquet/functions.py#L171

and it goes to .scan_parquet. And that only works with Paths or path strings (hence it doesn't find the path).

Either it is a Kedro bug or a Polars bug but I don't know which.

@astrojuanlu Apparently this has already been reported.

astrojuanlu commented 3 months ago

Thanks @grofte and sorry @mark-druffel for the delay, this fell off my radar. I'm labeling this accordingly and adding it to our backlog.

mark-druffel commented 3 months ago

@grofte I could be doing it wrong, but I'm still getting an error with pyarrow. @astrojuanlu Definitely no rush on my side, this isn't holding anything up for my team. Just wanted to report in case it was helpful / uknown.

# catalog.yml
model_input_table@eagerPolars:
  type: polars.EagerPolarsDataset
  filepath: data/03_primary/model_input_table.parquet/*.parquet
  file_format: parquet
  load_args:
    use_pyarrow: true

I ran catalog.load("model_input_table@eagerPolars") inside a kedro ipython environment and got what appears to be the same error 🤷

In [1]: catalog.load("model_input_table@eagerPolars")
[03/06/24 10:57:56] INFO     Loading data from model_input_table@eagerPolars (EagerPolarsDataset)...                                   data_catalog.py:483
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ ~/lib/python3.10/site-packages/kedro/io/core.py:193 in   │
│ load                                                                                             │
│                                                                                                  │
│   190 │   │   self._logger.debug("Loading %s", str(self))                                        │
│   191 │   │                                                                                      │
│   192 │   │   try:                                                                               │
│ ❱ 193 │   │   │   return self._load()                                                            │
│   194 │   │   except DatasetError:                                                               │
│   195 │   │   │   raise                                                                          │
│   196 │   │   except Exception as exc:                                                           │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/kedro_datasets/polars/eag │
│ er_polars_dataset.py:156 in _load                                                                │
│                                                                                                  │
│   153 │   │   │   │   " API"                                                                     │
│   154 │   │   │   │   " https://pola-rs.github.io/polars/py-polars/html/reference/io.html"       │
│   155 │   │   │   )                                                                              │
│ ❱ 156 │   │   with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:               │
│   157 │   │   │   return load_method(fs_file, **self._load_args)                                 │
│   158 │                                                                                          │
│   159 │   def _save(self, data: pl.DataFrame) -> None:                                           │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/fsspec/spec.py:1295 in    │
│ open                                                                                             │
│                                                                                                  │
│   1292 │   │   │   )                                                                             │
│   1293 │   │   else:                                                                             │
│   1294 │   │   │   ac = kwargs.pop("autocommit", not self._intrans)                              │
│ ❱ 1295 │   │   │   f = self._open(                                                               │
│   1296 │   │   │   │   path,                                                                     │
│   1297 │   │   │   │   mode=mode,                                                                │
│   1298 │   │   │   │   block_size=block_size,                                                    │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:180 in _open                                                                              │
│                                                                                                  │
│   177 │   │   path = self._strip_protocol(path)                                                  │
│   178 │   │   if self.auto_mkdir and "w" in mode:                                                │
│   179 │   │   │   self.makedirs(self._parent(path), exist_ok=True)                               │
│ ❱ 180 │   │   return LocalFileOpener(path, mode, fs=self, **kwargs)                              │
│   181 │                                                                                          │
│   182 │   def touch(self, path, truncate=True, **kwargs):                                        │
│   183 │   │   path = self._strip_protocol(path)                                                  │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:302 in __init__                                                                           │
│                                                                                                  │
│   299 │   │   self.autocommit = autocommit                                                       │
│   300 │   │   self.compression = get_compression(path, compression)                              │
│   301 │   │   self.blocksize = io.DEFAULT_BUFFER_SIZE                                            │
│ ❱ 302 │   │   self._open()                                                                       │
│   303 │                                                                                          │
│   304 │   def _open(self):                                                                       │
│   305 │   │   if self.f is None or self.f.closed:                                                │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/fsspec/implementations/lo │
│ cal.py:307 in _open                                                                              │
│                                                                                                  │
│   304 │   def _open(self):                                                                       │
│   305 │   │   if self.f is None or self.f.closed:                                                │
│   306 │   │   │   if self.autocommit or "w" not in self.mode:                                    │
│ ❱ 307 │   │   │   │   self.f = open(self.path, mode=self.mode)                                   │
│   308 │   │   │   │   if self.compression:                                                       │
│   309 │   │   │   │   │   compress = compr[self.compression]                                     │
│   310 │   │   │   │   │   self.f = compress(self.f, mode=self.mode)                              │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
FileNotFoundError: [Errno 2] No such file or directory: '/Users/m109993/Downloads/polars-bug/data/03_primary/model_input_table.parquet/*.parquet'

The above exception was the direct cause of the following exception:

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ in <module>:1                                                                                    │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/kedro/io/data_catalog.py: │
│ 490 in load                                                                                      │
│                                                                                                  │
│   487 │   │   │   extra={"markup": True},                                                        │
│   488 │   │   )                                                                                  │
│   489 │   │                                                                                      │
│ ❱ 490 │   │   result = dataset.load()                                                            │
│   491 │   │                                                                                      │
│   492 │   │   return result                                                                      │
│   493                                                                                            │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/kedro/io/core.py:615 in   │
│ load                                                                                             │
│                                                                                                  │
│   612 │   │   return self._filepath / version / self._filepath.name                              │
│   613 │                                                                                          │
│   614 │   def load(self) -> _DO:                                                                 │
│ ❱ 615 │   │   return super().load()                                                              │
│   616 │                                                                                          │
│   617 │   def save(self, data: _DI) -> None:                                                     │
│   618 │   │   self._version_cache.clear()                                                        │
│                                                                                                  │
│ ~/lib/python3.10/site-packages/kedro/io/core.py:202 in   │
│ load                                                                                             │
│                                                                                                  │
│   199 │   │   │   message = (                                                                    │
│   200 │   │   │   │   f"Failed while loading data from data set {str(self)}.\n{str(exc)}"        │
│   201 │   │   │   )                                                                              │
│ ❱ 202 │   │   │   raise DatasetError(message) from exc                                           │
│   203 │                                                                                          │
│   204 │   def save(self, data: _DI) -> None:                                                     │
│   205 │   │   """Saves data by delegation to the provided save method.                           │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
DatasetError: Failed while loading data from data set EagerPolarsDataset(file_format=parquet, 
filepath=~/data/03_primary/model_input_table.parquet/*.parquet, load_args={'use_pyarrow': True}, protocol=file, 
save_args={}).
[Errno 2] No such file or directory: '~/data/03_primary/model_input_table.parquet/*.parquet'
grofte commented 3 months ago

@mark-druffel You are absolutely right. This is seems to be because Lazy goes to https://docs.pola.rs/py-polars/html/reference/api/polars.scan_parquet.html#polars.scan_parquet which supports globbing while Eager calls open() on your globbing pattern which doesn't work. Polars read_parquet doesn't seem to support globbing either.

I thought it was to same thing as I ran into so I didn't read your error messages closely enough. Sorry.

grofte commented 3 months ago

If you wanted it to work with Eager I think you would have to use the dataset factory https://docs.kedro.org/en/stable/data/kedro_dataset_factories.html and then in your pipeline find out what files were there, create a node for each of them and then feed them into a node with a vstack to join them.

Much better to use the Lazy dataset ingestion and then call .collect() on it to go to eager / dataframe mode.

astrojuanlu commented 3 months ago

I think the solution to this might be to proceed with https://github.com/kedro-org/kedro-plugins/issues/519#issuecomment-1929000369

astrojuanlu commented 3 months ago

I'm mixing up open/scan with write/sink, ignore my previous comment.

astrojuanlu commented 3 months ago

@grofte @mark-druffel finally spent some time reflecting on this. Given that the underlying read_* methods do not support globbing, I'm voting for closing this as "won't fix" and encourage users to use polars.LazyPolarsDataset instead.

Thoughts?

grofte commented 3 months ago

I think it would be nice with a method that checks load/save arguments on the Kedro classes since Kedro is a layer of indirection in front of Polars. To inform the user better when they hit these gotchas (path globbing and dtypes). I know you could do this in documentation but that also seems awkward. But I don't think eager should support globbing when Polars doesn't.

mark-druffel commented 3 months ago

Sorry I'm not sure I follow the point on polars not globbing, probably because I have some flawed understanding of what's happening under the hood. Both these work for me in python environment, is this not what's happening with the two kedro datasets?

import polars as pl
# This code is eager
pl.read_parquet("~/*.parquet")
# This code is lazy
pl.scan_parquet("~/*.parquet")

@astrojuanlu Regardless of your response, I'm happy and unblocked. I would lean on your team with respect to what direction makes the most sense, I'm still too much of a novice (w/ kedro & polars) to have an authoritative opinion on what the code should do. Just let me know if you want me to close this issue and thanks!

noklam commented 3 months ago

(edited). There are some discussion of glob and open which I haven't confirmed. But I think we can simply the problem to be

# This work
import polars as pl 
path = '/workspace/kedro-plugins/polar-eager/data/03_primary/model_input_table.pq/*.parquet'
pl.read_parquet(path)

# This fail
catalog.load("model_input_table") # an eager dataset with filepath: '/workspace/kedro-plugins/polar-eager/data/03_primary/model_input_table.pq/*.parquet'

There are claims that polars doesn't work with glob with eager, this does not seem to be true (the code above show it works).

https://github.com/kedro-org/kedro-plugins/blob/65f28b54755c1a389aa600c582504648ae89624f/kedro-datasets/kedro_datasets/polars/eager_polars_dataset.py#L157-L158

If you change this in the _load method, it works immediately. So the real problem here is fsspec doesn't work with glob like this.

return load_method(load_path, **self._load_args) 
grofte commented 3 months ago

@mark-druffel I'm so sorry. I read the Polars docs and scan explicitly mentions globbing and read doesn't. But at second look read will take all the files in a directory if that's the path you give it. So makes sense that it also works with globbing.

If LazyDataset works with cloud buckets and globbing and Eager doesn't with globbing then I guess Kedro could just invoke scan and run .collect() before returning it? Maybe Eager could just inherit Lazy?

astrojuanlu commented 3 months ago

So the real problem here is fsspec doesn't work with glob like this.

Thanks for investigating @noklam 💯 As @grofte , I was also confused by the lack of mentions to globbing in the read docs.

As far as I remember, Polars supports remote paths with its own native methods. Maybe we should ditch fsspec for this particular dataset?

noklam commented 3 months ago

As far as I remember, Polars supports remote paths with its own native methods. Maybe we should ditch fsspec for this particular dataset?

This is what we do with spark. If the support is good enough we can definitely do this. Though we have to be careful, pandas also support reading from s3 but we still use fsspec. We should check how well does it support other remote path (i.e. not s3).

astrojuanlu commented 3 months ago

Yeah when I say "ditch fsspec for this particular dataset", I mean our specific boilerplate on the datasets code. I don't mind what mechanism does the underlying library use.

astrojuanlu commented 2 months ago

It was noted in backlog grooming today by @noklam that our fsspec boilerplate is linked to our versioning, so it's not that easy to ditch fsspec.