kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.98k stars 904 forks source link

[KED-3075] Using CachedDataSet and ParallelRunner together fails #673

Closed Minyus closed 2 years ago

Minyus commented 3 years ago

Description

Using CachedDataSet and ParallelRunner together fails.

Context

CachedDataSet and ParallelRunner are often used to make the pipeline run faster, but using both with Kedro 0.17.0 fails.

Steps to Reproduce

  1. Run kedro new --starter=pandas-iris and generate the Kedro project.
  2. Add the following to conf/base/catalog.yml.
example_model:
  type: CachedDataSet
  dataset: 
    type: pickle.PickleDataSet
    filepath: /tmp/model.pkl
  1. Run kedro run -p

Expected Result

Complete without error.

Actual Result

Tell us what happens instead.

2021-01-18 05:33:15,557 - kedro.pipeline.node - ERROR - Node `predict: predict([example_model,example_test_x]) -> [example_predictions]` failed with error: 
unsupported operand type(s) for *: 'float' and 'object'
2021-01-18 05:33:15,566 - kedro.runner.parallel_runner - WARNING - There are 2 nodes that have not run.
You can resume the pipeline run by adding the following argument to your previous command:
  --from-nodes "predict,report"
2021-01-18 05:33:15,587 - kedro.framework.session.store - INFO - `save()` not implemented for `BaseSessionStore`. Skipping the step.
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/concurrent/futures/process.py", line 239, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/parallel_runner.py", line 145, in _run_node_synchronization
    return run_node(node, catalog, is_async, run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 212, in run_node
    node = _run_node_sequential(node, catalog, run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 297, in _run_node_sequential
    outputs = _call_node_run(node, catalog, inputs, is_async, run_id=run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 265, in _call_node_run
    raise exc
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 255, in _call_node_run
    outputs = node.run(inputs)
  File "/opt/conda/lib/python3.7/site-packages/kedro/pipeline/node.py", line 466, in run
    raise exc
  File "/opt/conda/lib/python3.7/site-packages/kedro/pipeline/node.py", line 459, in run
    outputs = self._run_with_dict(inputs, self._inputs)
  File "/opt/conda/lib/python3.7/site-packages/kedro/pipeline/node.py", line 520, in _run_with_dict
    return self._decorated_func(**kwargs)
  File "/data/kedro_proj/pi/src/pi/pipelines/data_science/nodes.py", line 88, in predict
    result = _sigmoid(np.dot(X, model))
  File "<__array_function__ internals>", line 6, in dot
TypeError: unsupported operand type(s) for *: 'float' and 'object'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/conda/bin/kedro", line 8, in <module>
    sys.exit(main())
  File "/opt/conda/lib/python3.7/site-packages/kedro/framework/cli/cli.py", line 696, in main
    cli_collection(**cli_context)
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/conda/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/data/kedro_proj/pi/src/pi/cli.py", line 236, in run
    pipeline_name=pipeline,
  File "/opt/conda/lib/python3.7/site-packages/kedro/framework/session/session.py", line 414, in run
    run_result = runner.run(filtered_pipeline, catalog, run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/runner.py", line 100, in run
    self._run(pipeline, catalog, run_id)
  File "/opt/conda/lib/python3.7/site-packages/kedro/runner/parallel_runner.py", line 350, in _run
    node = future.result()
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
TypeError: unsupported operand type(s) for *: 'float' and 'object'

Your Environment

Minyus commented 3 years ago

Related to https://github.com/quantumblacklabs/kedro/issues/420 @deepyaman

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Minyus commented 3 years ago

Can anyone help on this issue?

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Minyus commented 3 years ago

Hi @deepyaman @yetudada , could you please help on this issue?

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

lorenabalan commented 3 years ago

@Minyus is this still an issue with latest Kedro version?

SwePalm commented 3 years ago

@lorenabalan i was able to reproduce this issue with Kedro version 0.17.4, python 3.8.10, Ubuntu 20.04 LTS

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

idanov commented 2 years ago

Hi @Minyus , thank you for reporting the issue! Unfortunately I don't think we'll add support for using CachedDataSet with the ParallelRunner. Making MemoryDataSet and CachedDataSet to work with multiprocessing will make those datasets overly complicated and we do not plan to do so in the near future. However if you fancy giving it a try, you can create a custom CachedDataSet leveraging the newly introduced in Python 3.8 SharedMemory. Kedro still supports versions prior to 3.8 and we cannot this class unfortunately, which would make the implementation of a multiprocessing-friendly CachedDataSet and MemoryDataSet classes much less complicated.

In the meanwhile, we should mark this dataset as non-usable by the ParallelRunner the same way we do for MemoryDataSet here: https://github.com/kedro-org/kedro/blob/main/kedro/runner/parallel_runner.py#L195-L238

merelcht commented 2 years ago

Closing this issue. We'll make it clear that CachedDataSet cannot be used with the ParallelRunner