kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.87k stars 893 forks source link

partial pipeline with parallel runner failing #988

Closed ckalas closed 2 years ago

ckalas commented 2 years ago

I have two pipelines running one after the other, it runs happily through with ParallelRunner specified. When I try and run the second pipeline, it crashes with parallel runner but not when run sequentially. The only inputs to second pipeline are a dataframe stored on disk as parquet and the parameters.yaml, it outputs somepkl` files.

kedro run --runner=ParallelRunner # works kedro run --runner=ParallelRunner --pipeline ml # doesnt work; ml runs after etl kedro run --pipeline ml # works

I am really at a loss as to what could be causing this behaviour, any insights would be appreciated.

The only traceback I get is this:

Traceback (most recent call last): File "/usr/local/var/pyenv/versions/test-ml/bin/kedro", line 8, in <module> sys.exit(main()) File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 266, in main cli_collection() File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/click/core.py", line 829, in __call__ return self.main(*args, **kwargs) File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 211, in main super().main( File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/kedro/framework/cli/project.py", line 408, in run session.run( File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/kedro/framework/session/session.py", line 414, in run run_result = runner.run(filtered_pipeline, catalog, run_id) File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/kedro/runner/runner.py", line 106, in run self._run(pipeline, catalog, run_id) File "/usr/local/var/pyenv/versions/3.8.9/envs/test-ml/lib/python3.8/site-packages/kedro/runner/parallel_runner.py", line 354, in _run node = future.result() File "/usr/local/var/pyenv/versions/3.8.9/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/local/var/pyenv/versions/3.8.9/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

kedro 0.17.5 python 3.8.9

datajoely commented 2 years ago

Hi @ckalas - what sort of operations are you doing in the ml pipeline? There are some things that can break use to limitations with multiprocessing in Python such as Spark.

ckalas commented 2 years ago

There are a many algorithms running some time series stuff, but can that be the problem? If the entire thing can run in parallel but the second one not I am not sure.

datajoely commented 2 years ago

Could you post which libraries? It's a bit difficult to work out from what you've posted so far, but something is causing concurrent.futures.process.BrokenProcessPool to be raised. Are you on Windows by any chance?

ckalas commented 2 years ago

In the node where it seems to break when running the ml pipeline in isolation it seems xgboost is the problem, why it isnt though with the full run in still werid. Running currently on macos currently but will be deploying to linux. Another thing I can see is that when i run the full pipeline in parallel and it completes it doesn't clean up all the child processes properly.

/usr/local/var/pyenv/versions/3.8.9/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 5 leaked semaphore objects to clean up at shutdown

datajoely commented 2 years ago

Thanks for the info - I'll look into this for you

datajoely commented 2 years ago

There are some quite fiddly recommendations on this thread, perhaps this is helpful? https://github.com/conda/conda/issues/9589

datajoely commented 2 years ago

Are you specifying njobs in the XGBoost code too? https://stackoverflow.com/a/55886425/2010808

ckalas commented 2 years ago

thanks, it seems setting n_jobs=1 fixed this, was doing it for other models but not the XGBoost for some reason. I will close the issue, but still curious why this only appeared as an issue when running the second pipeline.

My guess would be something like XGBoost couldnt fork anymore after running the ETL pipeline for whatever reason so it defaulted to single job.

datajoely commented 2 years ago

Okay good - now it should be noted that in some cases be more efficient to use the XGBoost parallelisation instead of Kedro's so you are always able to parallelise on a CLI level if they are independent.

kedro run --pipeline a --parallel & kedro run --pipeline b --runner=SequentialRunner --params="n_jobs:32"