ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.62k stars 5.71k forks source link

[Core] num_cpus is not respected by numpy when worker process is reused #35937

Open polikutinevgeny opened 1 year ago

polikutinevgeny commented 1 year ago

What happened + What you expected to happen

Running in prod we noticed that sometimes a task (inverting a matrix) would use only one thread instead of 10 set in num_cpus. Further investigation showed that while num_cpus=n would indeed set the OMP_NUM_THREADS environment variable to n, this change would not be respected by numpy if the worker process was reused. On the other hand, explicitly setting OMP_NUM_THREADS=n would be respected by numpy.

Versions / Dependencies

Ray: 2.4.0 Python: 3.10.10 Linux: 6.3.4-arch1-1 numpy: 1.24.2 threadpoolctl: 3.1.0

Reproduction script

import os
os.environ["RAY_DEDUP_LOGS"] = "0"

import time
import ray

ray.init(num_cpus=2)

def print_info():
    import numpy
    import os
    from threadpoolctl import threadpool_info
    for pool_info in threadpool_info():
        print("pool_info['num_threads']: ", pool_info["num_threads"])
    print("OMP_NUM_THREADS:", os.environ["OMP_NUM_THREADS"])

@ray.remote(num_cpus=1)
def f():
    print_info()
    time.sleep(1)  # Block worker

refs = [f.remote() for i in range(4)]
ray.get(refs)

print("Now using more cpus:", flush=True)

refs = [f.options(num_cpus=2).remote() for i in range(4)]
ray.get(refs)

print("Now with explicit env:", flush=True)

refs = [f.options(runtime_env={"env_vars": {"OMP_NUM_THREADS": "2"}}).remote() for i in range(4)]
ray.get(refs)

Issue Severity

Low: It annoys or frustrates me.

rkooo567 commented 1 year ago

cc @rickyyx I believe it is a known issue?

Also, IIUC, we should not reuse the worker if the required resources are different (i.e., num_cpus=1 and num_cpus=2 should use different workers). It could be a bug in this case. cc @cadedaniel can you confirm if this is how it works now?

polikutinevgeny commented 1 year ago

I have checked the output, and it seems that in the first case (changing num_cpus) workers are reused, while in the second case (changing OMP_NUM_THREADS) new workers are created.

Output ``` 2023-05-31 19:18:39,786 INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 (f pid=305058) pool_info['num_threads']: 1 (f pid=305058) OMP_NUM_THREADS: 1 (f pid=305059) pool_info['num_threads']: 1 (f pid=305059) OMP_NUM_THREADS: 1 (f pid=305058) pool_info['num_threads']: 1 (f pid=305058) OMP_NUM_THREADS: 1 (f pid=305059) pool_info['num_threads']: 1 (f pid=305059) OMP_NUM_THREADS: 1 Now using more cpus: (f pid=305058) pool_info['num_threads']: 1 (f pid=305058) OMP_NUM_THREADS: 2 (f pid=305058) pool_info['num_threads']: 1 (f pid=305058) OMP_NUM_THREADS: 2 (f pid=305058) pool_info['num_threads']: 1 (f pid=305058) OMP_NUM_THREADS: 2 (f pid=305058) pool_info['num_threads']: 1 (f pid=305058) OMP_NUM_THREADS: 2 Now with explicit env: (f pid=305276) pool_info['num_threads']: 2 (f pid=305276) OMP_NUM_THREADS: 2 (f pid=305275) pool_info['num_threads']: 2 (f pid=305275) OMP_NUM_THREADS: 2 (f pid=305275) pool_info['num_threads']: 2 (f pid=305275) OMP_NUM_THREADS: 2 (f pid=305276) pool_info['num_threads']: 2 (f pid=305276) OMP_NUM_THREADS: 2 ```
rickyyx commented 1 year ago

So if a worker is reused with different num_cpus, it's expected to have no impact on the second tasks' usage of numpy since I believe numpy configures its OMP threads at import time.

But I think worker shouldn't be re-used in this case.

rkooo567 commented 1 year ago

I think we shouldn't reuse the worker. This seems to be a bug to me

rynewang commented 1 year ago

Root cause is this: in calculating if a worker can be reused, we check the Ray Config worker_resource_limits_enabled. If it's false (by default) we ignore the CPU requirements. The code is at

https://github.com/ray-project/ray/blob/267b14ecbf6f550c1bc54e7fc8d35e579177236d/src/ray/common/task/task_spec.cc#L582

We can set this config to true to get a new worker. But I'm not sure if the default behavior shall be true.

@rickyyx @rkooo567

rickyyx commented 1 year ago

If it's false (by default) we ignore the CPU requirements

So right now, with different resource requirements, we will reuse workers by default? This doesn't sound right.

rynewang commented 1 year ago

Indeed we need to change. We can set this to true by default but that's visible behavior change. Plus I don't know when this knob is useful at all...