kedro-org / kedro-plugins

First-party plugins maintained by the Kedro team.
Apache License 2.0
91 stars 84 forks source link

Error saving `TensorFlowModelDataset` to Azure Blob Storage #839

Open gitgud5000 opened 1 week ago

gitgud5000 commented 1 week ago

Description

I get the following error when trying to save a TensorFlowModelDataset to Azure Blob Storage. The issue occurs only in Azure Blob Storage, not in the local filesystem:

DatasetError: Cannot save versioned dataset 'model_prestamos.keras' to 
'azureml/modelling/reprecios/data/06_models/run_2024-09-09' because a file with the same name already exists in the
directory. This is likely because versioning was enabled on a dataset already saved previously. Either remove 
'model_prestamos.keras' from the directory or manually convert it into a versioned dataset by placing it in a 
versioned directory (e.g. with default versioning format 
'azureml/modelling/reprecios/data/06_models/run_2024-09-09/model_prestamos.keras/YYYY-MM-DDThh.mm.ss.sssZ/model_pre
stamos.keras').

The error indicates that versioning is enabled despite the versioned: False setting in the dataset catalog. The file is never created or exists at any point in the Azure Blob Storage.

Context

Dataset Catalog Definition

_azure_base_path: 'abfs://azureml/modelling/reprecios/data'
"{namespace}.model":
  type: ${_datasets.tensormodel}
  credentials: azure_credentials
  versioned: False
  filepath: ${_azure_base_path}/06_models/run_${runtime_params:run_date}/model_{namespace}.keras
  save_args:
    overwrite: True 

Steps to Reproduce

Edit: Included code below to reproduce. (credit to @merelcht, @astrojuanlu)

from kedro_datasets.tensorflow import TensorFlowModelDataset
import tensorflow as tf
import numpy as np

credentials = {
    'account_name': '',
    'account_key': ''
}

data_set_local = TensorFlowModelDataset(filepath="data/tensorflow_model.keras")
data_set = TensorFlowModelDataset(filepath="abfs://azureml/modelling/data/tensorflow_model.keras",
                                  credentials=credentials)

inputs = tf.keras.Input(shape=(3,))
x = tf.keras.layers.Dense(4, activation=tf.nn.relu)(inputs)
outputs = tf.keras.layers.Dense(5, activation=tf.nn.softmax)(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)

input_data = np.array([[0.5, 0.3, 0.2]])
predictions = model.predict(input_data)

# This works
data_set_local.save(model)

# This Fails
try:
    data_set.save(model)
except Exception as e:
    print("🥲", e)
  1. Define a TensorFlowModelDataset in the Kedro catalog with the configuration above 👆
  2. Attempt to save a model using the .save() method targeting Azure Blob Storage.

I suspect this issue might be related to the following issue: kedro-plugins/issues/359, as the behavior appears to be similar to what is described there.

Environment

Full Error Traceback

Shorten for readability

INFO     Saving data to prestamos.model (TensorFlowModelDataset)...         [data_catalog.py]
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:727 in save               │
│                                                                                                  │
│   724 │   │   │   │   self.resolve_save_version()                                                │
│   725 │   │   │   )  # Make sure last save version is set                                        │
│   726 │   │   │   try:                                                                           │
│ ❱ 727 │   │   │   │   super()._save_wrapper(save_func)(self, data)                               │
│   728 │   │   │   except (FileNotFoundError, NotADirectoryError) as err:                         │
│   729 │   │   │   │   # FileNotFoundError raised in Win, NotADirectoryError raised in Unix       │
│   730 │   │   │   │   _default_version = "YYYY-MM-DDThh.mm.ss.sssZ"                              │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:249 in save               │
│                                                                                                  │
│   246 │   │   │                                                                                  │
│   247 │   │   │   try:                                                                           │
│   248 │   │   │   │   self._logger.debug("Saving %s", str(self))                                 │
│ ❱ 249 │   │   │   │   save_func(self, data)                                                      │
│   250 │   │   │   except (DatasetError, FileNotFoundError, NotADirectoryError):                  │
│   251 │   │   │   │   raise                                                                      │
│   252 │   │   │   except Exception as exc:                                                       │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro_datasets/tensorflow/tensorflow_model │
│ _dataset.py:176 in _save                                                                         │
│                                                                                                  │
│   173 │   │   │                                                                                  │
│   174 │   │   │   # Use fsspec to take from local tempfile directory/file and                    │
│   175 │   │   │   # put in ArbitraryFileSystem                                                   │
│ ❱ 176 │   │   │   self._fs.copy(path, save_path)                                                 │
│   177 │                                                                                          │
│   178 │   def _exists(self) -> bool:                                                             │
│   179 │   │   try:                                                                               │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:118 in wrapper              │
│                                                                                                  │
│    115 │   @functools.wraps(func)                                                                │
│    116 │   def wrapper(*args, **kwargs):                                                         │
│    117 │   │   self = obj or args[0]                                                             │
│ ❱  118 │   │   return sync(self.loop, func, *args, **kwargs)                                     │
│    119 │                                                                                         │
│    120 │   return wrapper                                                                        │
│    121                                                                                           │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:103 in sync                 │
│                                                                                                  │
│    100 │   │   # suppress asyncio.TimeoutError, raise FSTimeoutError                             │
│    101 │   │   raise FSTimeoutError from return_result                                           │
│    102 │   elif isinstance(return_result, BaseException):                                        │
│ ❱  103 │   │   raise return_result                                                               │
│    104 │   else:                                                                                 │
│    105 │   │   return return_result                                                              │
│    106                                                                                           │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:56 in _runner               │
│                                                                                                  │
│     53 │   if timeout is not None:                                                               │
│     54 │   │   coro = asyncio.wait_for(coro, timeout=timeout)                                    │
│     55 │   try:                                                                                  │
│ ❱   56 │   │   result[0] = await coro                                                            │
│     57 │   except Exception as ex:                                                               │
│     58 │   │   result[0] = ex                                                                    │
│     59 │   finally:                                                                              │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/fsspec/asyn.py:369 in _copy                │
│                                                                                                  │
│    366 │   │   │   paths2 = path2                                                                │
│    367 │   │   else:                                                                             │
│    368 │   │   │   source_is_str = isinstance(path1, str)                                        │
│ ❱  369 │   │   │   paths1 = await self._expand_path(                                             │
│    370 │   │   │   │   path1, maxdepth=maxdepth, recursive=recursive                             │
│    371 │   │   │   )                                                                             │
│    372 │   │   │   if source_is_str and (not recursive or maxdepth is not None):                 │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/adlfs/spec.py:1595 in _expand_path         │
│                                                                                                  │
│   1592 │   │   │   if not path.endswith("*"):                                                    │
│   1593 │   │   │   │   path = f"{path.strip('/')}"                                               │
│   1594 │   │   if isinstance(path, str):                                                         │
│ ❱ 1595 │   │   │   out = await self._expand_path(                                                │
│   1596 │   │   │   │   [path],                                                                   │
│   1597 │   │   │   │   recursive,                                                                │
│   1598 │   │   │   │   maxdepth,                                                                 │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/adlfs/spec.py:1640 in _expand_path         │
│                                                                                                  │
│   1637 │   │   │   │   │   out.add(fullpath)                                                     │
│   1638 │   │                                                                                     │
│   1639 │   │   if not out:                                                                       │
│ ❱ 1640 │   │   │   raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path)        │
│   1641 │   │   return list(sorted(out))                                                          │
│   1642 │                                                                                         │
│   1643 │   async def _put_file(                                                                  │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
FileNotFoundError: [Errno 2] No such file or directory: 
['tmp/kedro_tensorflow_tmpqic1m6vw/tmp_tensorflow_model.keras']

The above exception was the direct cause of the following exception:

╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮

│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/runner/runner.py:530 in              │
│ _run_node_sequential                                                                             │
│                                                                                                  │
│   527 │                                                                                          │
│   528 │   for name, data in items:                                                               │
│   529 │   │   hook_manager.hook.before_dataset_saved(dataset_name=name, data=data, node=node)    │
│ ❱ 530 │   │   catalog.save(name, data)                                                           │
│   531 │   │   hook_manager.hook.after_dataset_saved(dataset_name=name, data=data, node=node)     │
│   532 │   return node                                                                            │
│   533                                                                                            │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/data_catalog.py:588 in save       │
│                                                                                                  │
│   585 │   │   │   extra={"markup": True},                                                        │
│   586 │   │   )                                                                                  │
│   587 │   │                                                                                      │
│ ❱ 588 │   │   dataset.save(data)                                                                 │
│   589 │                                                                                          │
│   590 │   def exists(self, name: str) -> bool:                                                   │
│   591 │   │   """Checks whether registered data set exists by calling its `exists()`             │
│                                                                                                  │
│ /anaconda/envs/reprecios/lib/python3.11/site-packages/kedro/io/core.py:731 in save               │
│                                                                                                  │
│   728 │   │   │   except (FileNotFoundError, NotADirectoryError) as err:                         │
│   729 │   │   │   │   # FileNotFoundError raised in Win, NotADirectoryError raised in Unix       │
│   730 │   │   │   │   _default_version = "YYYY-MM-DDThh.mm.ss.sssZ"                              │
│ ❱ 731 │   │   │   │   raise DatasetError(                                                        │
│   732 │   │   │   │   │   f"Cannot save versioned dataset '{self._filepath.name}' to "           │
│   733 │   │   │   │   │   f"'{self._filepath.parent.as_posix()}' because a file with the same    │
│   734 │   │   │   │   │   f"name already exists in the directory. This is likely because "       │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
gitgud5000 commented 3 days ago

I've identified the issue within the _save method of TensorFlowModelDataset. Specifically, at line 172, it calls .copy() method. ( PR https://github.com/kedro-org/kedro-plugins/pull/608)

https://github.com/kedro-org/kedro-plugins/blob/552b973a256c0f4a9f96e36feb70f4fc15fb371b/kedro-datasets/kedro_datasets/tensorflow/tensorflow_model_dataset.py#L168-L172

According to the [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/copying.html#:~:text=copy()%20copies%20from%20a%20remote,local%20source%20to%20a%20remote%20target), the .copy() method is designed for copying files between two remote locations. However, in this case, since we're copying from a local fs to ABS, the correct method would be .put() rather than .copy().

Determine if the target is remote and switching to .put() accordingly should resolve the issue!

edit: something similar should be done with the _load method as the appropriate method in this case would be .get()

https://github.com/kedro-org/kedro-plugins/blob/552b973a256c0f4a9f96e36feb70f4fc15fb371b/kedro-datasets/kedro_datasets/tensorflow/tensorflow_model_dataset.py#L147

gitgud5000 commented 3 days ago

Ran some test and it seems is not necessary to use .copy() at all. .get() and put() will work with local-to-local copying as well.