dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

utils_tests `cleanup` fixture does not play well with `threading` #6775

Open DamianBarabonkovQC opened 2 years ago

DamianBarabonkovQC commented 2 years ago

What happened:

I use the distributed.utils_test fixture cleanup in my pytest tests. I have some code that uses dask threading scheduler. However, the cleanup fixture has a check_thread_leak function which throws an error.

When I took a deeper look, there are leftover threads such as ['ThreadPoolExecutor-1_0', 'ThreadPoolExecutor-2_0', 'ThreadPoolExecutor-2_1', 'ThreadPoolExecutor-2_2', 'ThreadPoolExecutor-2_3'] which were started by the dask thread executor.

In the thread pool executor, there is a shutdown routine scheduled using atexit:

#.../lib/python3.8/site-packages/dask/threaded.py(70)get()
#-> atexit.register(default_pool.shutdown)

which only gets run when the interpreter shuts down. This is after the cleanup fixture checks for leaked threads, which is what I assume is the root of this bug.

What you expected to happen:

I expect the cleanup fixture to finish without throwing an error.

Minimal Complete Verifiable Example:

# cleanup_throws.py
import pytest
import dask
from dask.system import cpu_count
from distributed import Client
from distributed.utils_test import cleanup
from distributed.utils_test import cluster, loop

CLUSTER_SESSION_NWORKERS = cpu_count()

@pytest.fixture
def cluster2(loop):
    with cluster(nworkers=CLUSTER_SESSION_NWORKERS) as (scheduler, workers):
        yield (scheduler, workers)    

@pytest.fixture
def client(loop, cluster2):
    scheduler, workers = cluster2
    with Client(scheduler["address"], loop=loop) as _client:
        yield _client

@pytest.mark.parametrize(
    "scheduler",
    ["threads"],    
)
def test_something(client, scheduler):
    ddf = dask.datasets.timeseries()

    ddf = ddf[ddf.y > 0]
    ddf = ddf.groupby("name").x.std()

    ddf.compute(scheduler=scheduler)

# Run with `pytest <cleanup_throws.py>

Anything else we need to know?:

Environment:

pavithraes commented 2 years ago

@DamianBarabonkovQC Thanks for reporting!

I'm looking at this with @ncclementi.

Even though we're trying to use the threaded scheduler, the error itself seems to be coming from distributed/utils_test.py.

The cleanup fixture we're importing works if we just use the distributed scheduler. We think the issue arises when we override the scheduler to be the single-machine "threaded" scheduler in ddf.compute(scheduler="threads"), and we're not sure if we should even expect it to work.

I think it might make sense to move it to the distributed tracker.

@hendrikmakait What do you think about this?

hendrikmakait commented 2 years ago

Hi @DamianBarabonkovQC, I'm not sure what you are trying to achieve with this code. From what I can see, you are manually setting up a distributed cluster through your fixtures, but then you are not using that cluster to run the code but use a single-machine threaded scheduler instead. Could you please elaborate on that?

If you replaced ddf.compute(scheduler="threads") with client.compute(ddf), you would run the computation on the distributed cluster instead and the test would not fail.

Apart from that, I would not consider the test fixtures of distributed public API, so use them at your own risk. If you still wanted to use the existing fixtures, I'd recommend taking a look at the @gen_cluster decorator which helps you setup a cluster for your tests. This could help you replace the fixtures you added. For example, https://github.com/dask/distributed/blob/659ab89b524763c1230219a7a949df54ddb99b6d/distributed/tests/test_client.py#L1434-L1441 illustrates a test that executes some numpy code.