ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
32k stars 5.45k forks source link

[Core] Memory leak when using asyncio #45895

Open stbnps opened 3 weeks ago

stbnps commented 3 weeks ago

What happened + What you expected to happen

When running the script below, the memory utilization of the Supervisor actor increases slowly over time. This usually leads to OOM for more complex applications.

This memory increase does not happen when we use worker_task = asyncio.wrap_future(worker.run.remote().future())instead of worker_task = worker.run.remote()

Versions / Dependencies

Python 3.8.19 Ray 2.10.0 Numpy 1.24.4

Reproduction script

import asyncio
import numpy as np
import ray
from ray.util.queue import Queue

@ray.remote(num_cpus=1)
class Worker:
    def __init__(self, queue):
        self.queue = queue

    async def run(self):
        for i in range(1000):
            status = np.ones((1024*1024)) * i
            self.queue.put(status)
            await asyncio.sleep(0.1)

@ray.remote(num_cpus=1)
class Supervisor:
    def __init__(self, workers, queue):
        self.workers = workers
        self.queue = queue
        self.worker_tasks = []

    async def check_workers(self):
        should_finish = False
        while not should_finish:
            done, pending = await asyncio.wait(self.worker_tasks, timeout=0.001)
            if len(done) == len(self.workers):
                should_finish = True
                break

            if not self.queue.empty():
                status_update = await self.queue.get_async()
                print(status_update)

    async def run(self):
        for worker in self.workers:
            worker_task = worker.run.remote()
            # worker_task = asyncio.wrap_future(worker.run.remote().future())
            self.worker_tasks.append(worker_task)
        await self.check_workers()

async def main():
    status_queue = Queue()
    worker_a = Worker.remote(status_queue)
    worker_b = Worker.remote(status_queue)
    supervisor = Supervisor.remote([worker_a, worker_b], status_queue)
    await supervisor.run.remote()

asyncio.run(main())

Issue Severity

High: It blocks me from completing my task.

anyscalesam commented 3 weeks ago

@stbnps are you able to repro on a newer ray version?

stbnps commented 3 weeks ago

With Ray 2.24.0 and Python 3.11.9 I cannot even call done, pending = await asyncio.wait(self.worker_tasks, timeout=0.001), it throws the following error:

Traceback (most recent call last):
  File "/media/stbn/2TB/git_chep/ray-migration/tmp.py", line 50, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/media/stbn/2TB/git_chep/ray-migration/tmp.py", line 48, in main
    await supervisor.run.remote()
ray.exceptions.RayTaskError(AttributeError): ray::Supervisor.run() (pid=38702, ip=192.168.68.32, actor_id=08a78a513a863842cede879e01000000, repr=<tmp.Supervisor object at 0x7fed29e92710>)
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/media/stbn/2TB/git_chep/ray-migration/tmp.py", line 41, in run
    await self.check_workers()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/media/stbn/2TB/git_chep/ray-migration/tmp.py", line 27, in check_workers
    done, pending = await asyncio.wait(self.worker_tasks, timeout=0.001)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/tasks.py", line 428, in wait
    return await _wait(fs, timeout, return_when, loop)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/asyncio/tasks.py", line 532, in _wait
    f.add_done_callback(_on_completion)
    ^^^^^^^^^^^^^^^^^^^
AttributeError: 'ray._raylet.ObjectRef' object has no attribute 'add_done_callback'

However, in the docs I see the following example, which indicates that I'd be able to call asyncio.wait on an ObjectRef:

import ray
import asyncio

@ray.remote
def some_task():
    return 1

async def await_obj_ref():
    await some_task.remote()
    await asyncio.wait([some_task.remote()])

asyncio.run(await_obj_ref())

When testing Ray 2.24 with Python 3.11 using asyncio.wrap_future(worker.run.remote().future()), the code seems to work and I don't see any memory increase, like in my previous comment.

Instead, when using Python 3.10 and Ray 2.24, worker_task = worker.run.remote() does not crash, but we still see the memory increase. We don't see the memory increasing when using asyncio.wrap_future though.

Are we now seeing 2 different issues? (The memory increase and the add_done_callback error)