ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.25k stars 5.63k forks source link

[core] Number of CPUs in ray.available_resources() does not match Dashboard's Machine View #13100

Open blueplastic opened 3 years ago

blueplastic commented 3 years ago

What is the problem?

Ray version: 1.2.0.dev0 Python version: 3.7.8

On a 8-core machine, if I initialize Ray with num_cpus=16 and then run ray.available_resources(), I see 16 CPU cores. However, in the Dashboard's Machine View tab, I only see 8 Python PIDs/cores.

I was confused about a couple of things here:

Reproduction (REQUIRED)

import ray ray.init(dashboard_host="0.0.0.0", num_cpus=16) ray.available_resources() # Shows 16 CPU cores

@ray.remote(num_cpus=16) class A: def f(self): return 0

obj = A.remote()

ray.get(obj.f.remote()) # This runs successfully and returns 0

The Dashboard shows 8 cores, which doesn't match the results above:

Screen Shot 2020-12-28 at 11 32 44 PM
rkooo567 commented 3 years ago

Why does Ray allow being initialized with a higher number of CPUs than there are physically? I was expecting it to throw an error for any number > 8. Is there ever a benefit in declaring a higher num_cpus than there are physical cores?

Ray's resources (cpu, memory) is just used as bookkeeping values for scheduling. I believe we don't have any constrain in these values unless it crashes Ray (e.g., too low memory constraint can crash ray).

The mismatch between ray.available_resources() showing 16 CPU cores and the Dashboard showing 8 cores was surprising. I was expecting to see 16 Python PIDs in the Dashboard and assumed Ray was going to let me oversubscribe. Furthermore, if I decorated a function with @ray.remote(num_cpus=16), it runs successfully, even though I technically only have 8 physical cores here.

@mfitton I think we should show 16 workers / 16 cores since num_cpus=16. Can you help isolating if the issue is from the dashboard or the core ray?

mfitton commented 3 years ago

Hm, yeah, I can take a look. The dashboard gets its information about the number of available cores from psutil, not from Ray's available_resources. That said, I'm not exactly sure the behavior if Ray has 16 workers and 8 cores. The dashboard might only display workers for which there is a currently executing PID. Let me dig in a bit more and get back with more information.

mfitton commented 3 years ago

This function from datacenter.py explains why only 8 workers would be shown. We only add workers listed in the CoreWorkerStats to the API payload if the CoreWorkerStats has a corresponding physical worker process.

I'll look into behavior of number of cores next.

Function for reference:

async def get_node_workers(cls, node_id):
        workers = []
        node_ip = DataSource.node_id_to_ip[node_id]
        node_logs = DataSource.ip_and_pid_to_logs.get(node_ip, {})
        node_errs = DataSource.ip_and_pid_to_errors.get(node_ip, {})
        node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
        node_stats = DataSource.node_stats.get(node_id, {})
        # Merge coreWorkerStats (node stats) to workers (node physical stats)
        pid_to_worker_stats = {}
        pid_to_language = {}
        pid_to_job_id = {}
        for core_worker_stats in node_stats.get("coreWorkersStats", []):
            pid = core_worker_stats["pid"]
            pid_to_worker_stats.setdefault(pid, []).append(core_worker_stats)
            pid_to_language[pid] = core_worker_stats["language"]
            pid_to_job_id[pid] = core_worker_stats["jobId"]
        for worker in node_physical_stats.get("workers", []):
            worker = dict(worker)
            pid = worker["pid"]
            worker["logCount"] = len(node_logs.get(str(pid), []))
            worker["errorCount"] = len(node_errs.get(str(pid), []))
            worker["coreWorkerStats"] = pid_to_worker_stats.get(pid, [])
            worker["language"] = pid_to_language.get(
                pid, dashboard_consts.DEFAULT_LANGUAGE)
            worker["jobId"] = pid_to_job_id.get(
                pid, dashboard_consts.DEFAULT_JOB_ID)

            await GlobalSignals.worker_info_fetched.send(node_id, worker)

            workers.append(worker)
        return workers
mfitton commented 3 years ago

Yep, the CPUs in the dashboard per-node are also as reported by psutil, not based on the resources that Ray has in its scheduler. This explains why 8 CPUs are reported even though Ray was initialized with 16 CPUs, because there are 8 CPUs on the node.

This is recorded in the reporter_agent: https://github.com/ray-project/ray/blob/master/dashboard/modules/reporter/reporter_agent.py#L66

Stefan-1313 commented 3 years ago

@blueplastic Perhaps you have logical cores. Try: psutil.cpu_count(logical=False) and psutil.cpu_count(logical=True)

and see if it explains the difference.

scottsun94 commented 1 year ago

Why does Ray allow being initialized with a higher number of CPUs than there are physically? I was expecting it to throw an error for any number > 8. Is there ever a benefit in declaring a higher num_cpus than there are physical cores?

@blueplastic Does @rkooo567 's reply answer this question? Having this logical/bookkeeping resource concept is helpful in some other cases: for example, when you need heterogeneous clusters, you could specify cpu=0 for gpu nodes so that no cpu tasks will be scheduled there.

The mismatch between ray.available_resources() showing 16 CPU cores and the Dashboard showing 8 cores was surprising. I was expecting to see 16 Python PIDs in the Dashboard and assumed Ray was going to let me oversubscribe. Furthermore, if I decorated a function with @ray.remote(num_cpus=16), it runs successfully, even though I technically only have 8 physical cores here.

@blueplastic Does it make sense to you?