ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32.94k stars 5.58k forks source link

[Ray Cluster] Resource monitoring of Ray Tasks and Actors #46377

Open vipese-idoven opened 2 months ago

vipese-idoven commented 2 months ago

Description

Continuous resource monitoring of Ray Tasks and Actor with metrics extraction.

Use case

Currently, Ray does not support other OOM prevention than allocating more resource by Ray Tasks and reduce concurrency. However, this can still trigger OOM if the Task uses more resources than allocated.

Implementing continuous monitoring of resources used by Ray Task and Actors (ray.util.state falls short to do this) would allow to design routines to prevent OOM by killing Task / Actors before using more memory than specified in resources.

So far, I've been able to implement a self-monitoring Threaded Actor, also posted in the Ray Discussion Forum, but somehow this seems sub-optimal.

import time  # noqa: D100
from typing import Any
import numpy as np
import psutil
import ray
import os

@ray.remote(max_concurrency=2) # Required to run monitoring and a task
class SelfMonitoringActor():

    def __init__(self) -> None:
        self.finished = False
        self.period = 1

    def monitor(self):
        worker_pid = os.getpid()
        process = psutil.Process(worker_pid)

        print(f"Monitoring is taking place in: PID {os.getpid()}")
        while not self.finished:
            memory_used = process.memory_info().rss
            print(f"Task is using {memory_used / (1024 ** 2)} MB")
            time.sleep(self.period/2) 

    def memory_intensive_task(self) -> Any:  
        """Simulates a memory intensive tasks."""
        print(f"Memory intensive task is taking place in: PID {os.getpid()}")
        for i in range(100):
            large_array = np.ones((100+i, 100+1, 100+i))*0.5
            large_array.sum()
            print(
                f"Iteration: {i}. Memory used by array: {large_array.size * large_array.itemsize / (1024 ** 2)} MB"
            )
            time.sleep(self.period)
        return large_array.sum()

    def run(self) -> Any:
        task_out = self.memory_intensive_task()
        self.finished = True
        time.sleep(self.period) # Leave time to stop monitor running
        return task_out

def main():
    ray.init()
    actor = SelfMonitoringActor.remote()
    task_out, _ = ray.get(  # Run both tasks concurrently in same worker and PID
        [actor.run.remote(),
        actor.monitor.remote()]
    )
    print("Task ran successfully!")
    return task_out

if __name__ == "__main__":
    main()
Superskyyy commented 2 months ago

The way you monitor such tasks will introduce overhead due to the blocking nature of Python threads (until Python3.13 nogil) and cannot scale to more observability dimensions (depending on what telemetry you are collecting it can be up to 50% if not careful). Doing ad-hoc monitoring like this can be acceptable as a quick workaround for certain jobs but it is better to be done externally to the worker process through a global monitor.

vipese-idoven commented 2 months ago

I see... Thanks for the quick response @Superskyyy.

I tried monitoring externally using Ray State, but it does not provide a similar functionality, and passing PIDs from other nodes resulted into errors. Also, Ray's guidelines indicates that Ray generate some system metrics that can be used for monitoring, but Prometheus falls short as it is at cluster level (rather than Task or Actor level, which can help preventing killing tasks that are using more memory than initially allocated).

Do you have any suggestions as to how to do it?

Superskyyy commented 2 months ago

We do have an internal experimental implementation of such continuous monitoring system with feedback on ray. We are discussing needs and see if this can be integrated back to open-source gradually. Happy to loop you in when we propose RFC or REPs on it.

vipese-idoven commented 2 months ago

I'd really appreciate that – thank you @Superskyyy !