dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.42k stars 1.7k forks source link

BUG/DOC: threads_per_worker does not limit all thread types #9588

Open larsoner opened 1 year ago

larsoner commented 1 year ago

Describe the issue:

Dask Client with threads_per_worker=1 does not limit all thread types, so it can be violated for example by sklearn FastICA via OpenMP.

Minimal Complete Verifiable Example:

```python import os from dask.distributed import Client import joblib import numpy as np from sklearn.decomposition import FastICA import threadpoolctl n_workers = 3 def fit_ica(seed): #threadpoolctl.threadpool_limits(limits=1, user_api='openmp') data = np.random.RandomState(seed).randn(10000, 10) ica = FastICA( n_components=10, random_state=seed, whiten='unit-variance', max_iter=1000) ica.fit(data) if __name__ == '__main__': client = Client(n_workers=n_workers, threads_per_worker=1, name='test') with joblib.parallel_backend('dask'): parallel = joblib.Parallel(n_jobs=n_workers) parallel(joblib.delayed(fit_ica)(seed) for seed in range(n_workers)) ```

On my macOS M1 machine (with 10 CPUs) I get:

$ time python -u rep_dask.py 
...
real    0m20.006s
user    1m53.512s
sys 0m15.559s

Which is driven by oversubscription of CPUs -- 30 threads get spawned because n_workers=3 and no limit is set on the OpenMP number of threads.

However, uncommenting the threadpoolctl.threadpool_limits(limits=1, user_api='openmp') line above within the fit_ica function, I get much better timings:

$ time python -u rep_dask.py 
...
real    0m4.766s
user    0m6.675s
sys 0m1.612s

Anything else we need to know?:

I understand that a solution to threads_per_worker) might be (endlessly) complicated -- although maybe some iteration overthreadpoolctl`-discovered interfaces could be used? -- so in lieu of that, maybe docs could be updated to give people hints. I'm thinking specifically:

  1. Document what threads_per_worker actually controls/sets and how, e.g. in LocalCluster.
  2. Add a link from the threads_per_worker docstring to some separate doc/FAQ that talk about OMP_NUM_THREADS, MKL_NUM_THREADS, threadpoolctl, etc. methods of controlling execution.

Apologies if these are already documented somewhere and I missed it!

Environment:

ncclementi commented 1 year ago

@larsoner Thank you for open this issue. Could you provide a minimal reproducible example that we can run and check? Is this problem still present in the most recent version of dask and distributed?

larsoner commented 1 year ago

Could you provide a minimal reproducible example that we can run and check?

Yes one is in the top comment, I just collapsed it with a <details> tag in my post

Is this problem still present in the most recent version of dask and distributed?

It looks like it is indeed fixed on 2022.10.0!

Maybe still worth leaving open as a DOC update to say what gets set by the threads_per_worker somewhere in case this crops up again somehow/somewhere? If not, feel free to close.

ncclementi commented 1 year ago

Yes one is in the top comment, I just collapsed it with a <details> tag in my post

Whoops, I missed this.

Is this problem still present in the most recent version of dask and distributed? It looks like it is indeed fixed on 2022.10.0! Maybe still worth leaving open as a DOC update to say what gets set by the threads_per_worker somewhere in case this crops up again somehow/somewhere? If not, feel free to close.

I see we have documented this in LocalCluster. See https://distributed.dask.org/en/stable/api.html?highlight=LocalCluster#distributed.LocalCluster where it says:

threads_per_worker: int
    Number of threads per each worker

Is this what you were looking for, or do you have anything else in mind?

larsoner commented 1 year ago

Is this what you were looking for, or do you have anything else in mind?

Just more detail about how/what it actually sets so that the scope of the thread control can be inferred. For example, if it's documented (somewhere) what this parameter controls, in the future if someone sees CPU oversubscription, they can better determine if:

  1. Stated to be covered by dask but isn't (e.g., if dask iterates over threadpoolctl-detected endpoints and sets them all, but something threadpoolctl-able is observed not to be limited)
  2. Not stated to be covered by dask, but maybe it should (e.g., if dask uses some potentially incomplete inspection/setting method that could be extended to support something else)
  3. Not stated to be covered by dask, and shouldn't be (e.g., some non-OpenMP/custom threading code in a some C++ code called from Python)
ncclementi commented 1 year ago

@jrbourbeau Do you know if we have more documentation about this? I was not able to find it but I might be missing something.

jrbourbeau commented 1 year ago

Thanks @larsoner. I think updating the threads_per_worker description to be

threads_per_worker: int
    The number of threads in each worker's ``ThreadPoolExecutor`` where tasks are executed.

would make things more clear.

Do you know if we have more documentation about this?

We have this best practice on avoiding oversubscribing threads https://docs.dask.org/en/stable/array-best-practices.html#avoid-oversubscribing-threads.

jrbourbeau commented 1 year ago

@ncclementi are you interested in updating the LocalCluster docstring?

larsoner commented 1 year ago

I can do it and maybe add some cross refs to the best practices in the places I looked for information

ncclementi commented 1 year ago

@larsoner go for it, feel free to ping me for review.