kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.53k stars 879 forks source link

Kedro Resume Information fails #3725

Closed MarcelBeining closed 4 months ago

MarcelBeining commented 4 months ago

Description

Normally, when a kedro pipeline fails, it provides information about how to resume the pipeline (from which nodes to start). But since newer versions it always runs into an error in our cases. I looked a bit into the case and found out it always happens when you have a dataset that is not defined in the catalog. This is then simply treated as a MemoryDataset (which is normal) but these MemoryDatasets are not in the catalog anymore (which is not normal), thus the error below happens.

Steps to Reproduce

  1. Create a kedro pipeline with at least one memory dataset (not explicitly defined in the catalog!) inbetween
  2. Implement an error into one of your nodes
  3. Run the pipeline

Expected Result

You should get information of kedro in the console about how the error and how to resume from there

Actual Result

NameError: name 'dfggdfsdgsf' is not defined

During handling of the above exception, another exception occurred:

┌───────────────────── Traceback (most recent call last) ─────────────────────┐
\src\XXXXX\__main__.py:47 in <module>                    │
│                                                                             │
│   44                                                                        │
│   45                                                                        │
│   46 if __name__ == "__main__":                                             │
│ > 47 │   main()                                                             │
│   48                                                                        │
│                                                                             │
\src\XXXX\__main__.py:43 in main                        │
│                                                                             │
│   40 │   package_name = Path(__file__).parent.name                          │
│   41 │   configure_project(package_name)                                    │
│   42 │   run = _find_run_command(package_name)                              │
│ > 43 │   run(*args, **kwargs)                                               │
│   44                                                                        │
│   45                                                                        │
│   46 if __name__ == "__main__":                                             │
│                                                                             │
\venv\lib\site-packages\click\core.py:1157 in __call__      │
│                                                                             │
\venv\lib\site-packages\click\core.py:1078 in main          │
│                                                                             │
\venv\lib\site-packages\click\core.py:1434 in invoke        │
│                                                                             │
\venv\lib\site-packages\click\core.py:783 in invoke         │
│                                                                             │
\venv\lib\site-packages\kedro\framework\cli\project.py:225  │
│ in run                                                                      │
│                                                                             │
│   222 │   with KedroSession.create(                                         │
│   223 │   │   env=env, conf_source=conf_source, extra_params=params         │
│   224 │   ) as session:                                                     │
│ > 225 │   │   session.run(                                                  │
│   226 │   │   │   tags=tuple_tags,                                          │
│   227 │   │   │   runner=runner_obj(is_async=is_async),                     │
│   228 │   │   │   node_names=tuple_node_names,                              │
│                                                                             │
\venv\lib\site-packages\kedro\framework\session\session.py: │
│ 392 in run                                                                  │
│                                                                             │
│   389 │   │   )                                                             │
│   390 │   │                                                                 │
│   391 │   │   try:                                                          │
│ > 392 │   │   │   run_result = runner.run(                                  │
│   393 │   │   │   │   filtered_pipeline, catalog, hook_manager, session_id  │
│   394 │   │   │   )                                                         │
│   395 │   │   │   self._run_called = True                                   │
│                                                                             │
\venv\lib\site-packages\kedro\runner\runner.py:117 in run   │
│                                                                             │
│   114 │   │   │   self._logger.info(                                        │
│   115 │   │   │   │   "Asynchronous mode is enabled for loading and saving  │
│   116 │   │   │   )                                                         │
│ > 117 │   │   self._run(pipeline, catalog, hook_or_null_manager, session_id │
│   118 │   │                                                                 │
│   119 │   │   self._logger.info("Pipeline execution completed successfully. │
│   120                                                                       │
│                                                                             │
\venv\lib\site-packages\kedro\runner\sequential_runner.py:7 │
│ 8 in _run                                                                   │
│                                                                             │
│   75 │   │   │   │   run_node(node, catalog, hook_manager, self._is_async,  │
│   76 │   │   │   │   done_nodes.add(node)                                   │
│   77 │   │   │   except Exception:                                          │
│ > 78 │   │   │   │   self._suggest_resume_scenario(pipeline, done_nodes, ca │
│   79 │   │   │   │   raise                                                  │
│   80 │   │   │                                                              │
│   81 │   │   │   # decrement load counts and release any data sets we've fi │
│                                                                             │
\venv\lib\site-packages\kedro\runner\runner.py:206 in       │
│ _suggest_resume_scenario                                                    │
│                                                                             │
│   203 │   │   │   start_p = resume_p.only_nodes_with_inputs(*resume_p.input │
│   204 │   │   │                                                             │
│   205 │   │   │   # find the nearest persistent ancestors of the nodes in s │
│ > 206 │   │   │   start_p_persistent_ancestors = _find_persistent_ancestors │
│   207 │   │   │   │   pipeline, start_p.nodes, catalog                      │
│   208 │   │   │   )                                                         │
│   209                                                                       │
│                                                                             │
\venv\lib\site-packages\kedro\runner\runner.py:249 in       │
│ _find_persistent_ancestors                                                  │
│                                                                             │
│   246 │   queue, visited = deque(children), set(children)                   │
│   247 │   while queue:                                                      │
│   248 │   │   current_node = queue.popleft()                                │
│ > 249 │   │   if _has_persistent_inputs(current_node, catalog):             │
│   250 │   │   │   ancestor_nodes_to_run.add(current_node)                   │
│   251 │   │   │   continue                                                  │
│   252 │   │   for parent in _enumerate_parents(pipeline, current_node):     │
│                                                                             │
\venv\lib\site-packages\kedro\runner\runner.py:292 in       │
│ _has_persistent_inputs                                                      │
│                                                                             │
│   289 │   for node_input in node.inputs:                                    │
│   290 │   │   # if node_input not in catalog._datasets:                     │
│   291 │   │   #     breakpoint()                                            │
│ > 292 │   │   if isinstance(catalog._datasets[node_input], MemoryDataset):  │
│   293 │   │   │   return False                                              │
│   294 │   return True                                                       │
│   295                                                                       │
└─────────────────────────────────────────────────────────────────────────────┘
KeyError: 'SomeMemoryDatasetNotDefined'

Your Environment

datajoely commented 4 months ago

I think this is a great idea -

We have this open PR #3719 which is close to merge - it makes sense to do this once that goes in (or maybe see if we can squeeze in the check there 🤔 ).

datajoely commented 4 months ago

Actually it may be solved by #3719

noklam commented 4 months ago

@MarcelBeining Is it possible to create an example with the spaceflights tutorial and provided the exact command to reproduce the scenario?

This is then simply treated as a MemoryDataset (which is normal) but these MemoryDatasets are not in the catalog anymore (which is not normal), thus the error below happens.

I am not sure if I am following, from which version it starts failing? My intuition is this related the dataset factory, #3719 is more of an incremental improvement of the existing resume suggestion.

noklam commented 4 months ago

I go ahead to close this issue since there is insufficient information. Feel free to open a new ticket with some reproducible example if this still an issue.