kedro-org / kedro

Kedro is a toolbox for production-ready data science. It uses software engineering best practices to help you create data engineering and data science pipelines that are reproducible, maintainable, and modular.
https://kedro.org
Apache License 2.0
9.79k stars 897 forks source link

[KED-3024] Leaked semaphore when using Dask within a Kedro pipeline #1100

Closed alessio-ca closed 9 months ago

alessio-ca commented 2 years ago

Description

I am using Dask to speed-up some tasks in my Kedro pipeline. Dask is used within a node. I always end up having the following warning:

/Users/alessiocaciagli/.pyenv/versions/3.8.9/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 8 leaked semaphore objects to clean up at shutdown

when using the Dask processes scheduler.

Context

The warning does not appear if the Dask threads scheduler is used instead. Also, the warning does not appear if the node is executed as a stand-alone python module. It only appears when running the node as part of a Kedro pipeline.

Steps to Reproduce

  1. Define the following mock node:
    
    import dask.dataframe as dd
    import pandas as pd
    import numpy as np

def test_dask(df: pd.DataFrame) -> pd.DataFrame: df = pd.DataFrame({"x": np.arange(1000000), "y": np.arange(1000000) / 2}) ddf = dd.from_pandas(df, npartitions=128)

def myadd(row, a, b=1):
    return row.sum() + a + b

return ddf.apply(myadd, axis=1, args=(2,), b=1.5, meta=("x", "f8")).compute(
    scheduler="processes"
)
2. Include it as a node in a pipeline (the input DataFrame is a placeholder, can be empty).
3. Run the node (e.g. `kedro run --node=test_dask`)

## Expected Result
No warning should be returned.

## Actual Result

/Users/alessiocaciagli/.pyenv/versions/3.8.9/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 8 leaked semaphore objects to clean up at shutdown



## Your Environment
Include as many relevant details about the environment in which you experienced the bug:

* Kedro version used (`pip show kedro` or `kedro -V`): kedro, version 0.17.5
* Dask version used: `dask-2021.11.2`
* Python version used (`python -V`): Python 3.8.9 (default, May  6 2021, 12:48:01) 
* Operating system and version: macOS 11.6.1 (20G224)
datajoely commented 2 years ago

Hi @Cortysus - Kedro supports dask in it's own dataset could you try saving to parquet via Pandas and transcoding to dask using this dataset. That way it's not the node's responsibility to convert the DataFrame.

We also have this video from the folks at Coiled (the commercial dask project) using Kedro https://www.linkedin.com/posts/gustafrcavanaugh_python-ml-dask-activity-6845770788231106560-hx0v/

alessio-ca commented 2 years ago

Hi, thanks for the reply!

I'm not sure I fully followed, but I've tried the following. In the nodes definition, I have:

import dask.dataframe as dd
import numpy as np
import pandas as pd

def test_dask_create(df: pd.DataFrame) -> dd.DataFrame:
    df = pd.DataFrame({"x": np.arange(1000000), "y": np.arange(1000000) / 2})
    ddf = dd.from_pandas(df, npartitions=128)
    return ddf

def test_dask_execute(ddf: dd.DataFrame) -> pd.DataFrame:
    def myadd(row, a, b=1):
        return row.sum() + a + b

    return ddf.apply(myadd, axis=1, args=(2,), b=1.5, meta=("x", "f8")).compute(
        scheduler="processes"
    )

In my pipeline definition, I have:

from kedro.pipeline import Pipeline, node
from .nodes import (
    test_dask_create,
    test_dask_execute,
)

def create_pipeline(**kwargs):
    return Pipeline(
        [   
           node(
                func=test_dask_create,
                inputs="placeholder_df",
                outputs="ddf",
                name="create_node",
            ),
            node(
                func=test_dask_execute,
                inputs="ddf",
                outputs="out_df",
                name="execute_node",
            ),
        ]
    )

In this way the save/load should be delegated. I still get the same warning however.

datajoely commented 2 years ago

Hi this isn't maintained by the Kedro team - but this repo I've found looks like it implements things in the right way https://github.com/blaizeberry4/lastfm-recs-sandbox/blob/3f997ae2301d6108c72df8dda325c101398b77c9/src/etl/pipelines/eda.py

alessio-ca commented 2 years ago

Hi this isn't maintained by the Kedro team - but this repo I've found looks like it implements things in the right way https://github.com/blaizeberry4/lastfm-recs-sandbox/blob/3f997ae2301d6108c72df8dda325c101398b77c9/src/etl/pipelines/eda.py

Thanks for the reply. Yeah that implementation does work, but they use the thread scheduler (used by compute() by default), which I also report not triggering the warning.

Given all the evidence, It looks to me the warning stems from a conflict between the multiprocessing pool of Kedro (perhaps triggered by NumExpr?) and Dask respectively, but I'm a bit lost where to look for the root of the problem :)

datajoely commented 2 years ago

Ah have you tried using the kedro run --runner=ThreadRunner I think for remote execution environments like Spark and Dask you can't use ParallelRunner/Multiprocessing for that reason.

alessio-ca commented 2 years ago

Warning persists even using the ThreadRunner runner. After a bit more of digging, I can say the warning is not due to Dask per se, but due to multiprocessing being used inside a node (makes sense since multiprocessing is also used internally by Dask when the scheduler is processes). I can reproduce the exact same warning by replacing the Dask command with a mock multiprocessing call, e.g. having this node test_mp in a Pipeline:

import multiprocessing as mp
from typing import List

def _my_func(x: int) -> int:
    return x ** x

def test_mp(integer_list: List[int]) -> List[int]:
    pool = mp.Pool(mp.cpu_count())
    return pool.map(_my_func, integer_list)
datajoely commented 2 years ago

If you're using ThreadRunner we shouldn't be triggering any multi-processing from the Kedro side, so I'm not sure what going on her? Can you post the screenshot of the terminal output

https://kedro.readthedocs.io/en/stable/_modules/kedro/runner/sequential_runner.html#SequentialRunner https://kedro.readthedocs.io/en/stable/_modules/kedro/runner/thread_runner.html#ThreadRunner

alessio-ca commented 2 years ago

Sure. I've made a mock repository if it can help -- the layout of the pipeline & nodes should now be clearer. https://github.com/Cortysus/test_kedro_mp.git

This is the output of running kedro run --runner=ThreadRunner:

(.venv) alessiocaciagli@Alessios-MBP-2 test_kedro_mp % kedro run --runner=ThreadRunner
2021-12-10 13:19:58,101 - kedro.framework.session.store - INFO - `read()` not implemented for `BaseSessionStore`. Assuming empty store.
2021-12-10 13:19:58,153 - root - INFO - ** Kedro project test_kedro_mp
2021-12-10 13:19:58,165 - kedro.pipeline.node - INFO - Running node: test_mp(None) -> [my_test]
[1, 4, 27, 256]
2021-12-10 13:19:59,147 - kedro.io.data_catalog - INFO - Saving data to `my_test` (MemoryDataSet)...
2021-12-10 13:19:59,148 - kedro.runner.thread_runner - INFO - Completed node: test_mp(None) -> [my_test]
2021-12-10 13:19:59,148 - kedro.runner.thread_runner - INFO - Completed 1 out of 1 tasks
2021-12-10 13:19:59,148 - kedro.runner.thread_runner - INFO - Pipeline execution completed successfully.
2021-12-10 13:19:59,149 - kedro.io.data_catalog - INFO - Loading data from `my_test` (MemoryDataSet)...
2021-12-10 13:19:59,149 - kedro.framework.session.store - INFO - `save()` not implemented for `BaseSessionStore`. Skipping the step.
(.venv) alessiocaciagli@Alessios-MBP-2 test_kedro_mp % /Users/alessiocaciagli/.pyenv/versions/3.8.9/lib/python3.8/multiprocessing/resource_tracker.py:222: UserWarning: resource_tracker: There appear to be 8 leaked semaphore objects to clean up at shutdown
  warnings.warn(

Instead, if one were to run python test_kedro_noproject.py (which runs the same exact pipeline but outside the Kedro project):

(.venv) alessiocaciagli@Alessios-MBP-2 test_kedro_mp % python test_kedro_noproject.py 
[1, 4, 27, 256]
(.venv) alessiocaciagli@Alessios-MBP-2 test_kedro_mp % 

and no warning is returned!

datajoely commented 2 years ago

Thanks for posting this - I'll share this with the team to investigate

oentaryorj commented 1 year ago

Hi, is there a follow-up on this? I got a similar warning when running Dask in one Kedro node (using either SequentialRunner and ThreadRunner), and my pipeline simply halts afterwards ...

merelcht commented 1 year ago

Hi @oentaryorj , unfortunately the core team doesn't have capacity to work on this at the moment. I will bump it up in priority, but in the meantime we would be very grateful to any contributions for this problem.

noklam commented 11 months ago

This https://github.com/kedro-org/kedro/pull/1614 could potentially solve the problem. In the past Kedro always import the multi-processing module.

astrojuanlu commented 9 months ago

1614 was closed and we haven't see more reports of this issue, so I'm closing. Feel free to leave a comment if you are affected by this.