ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.02k stars 5.59k forks source link

[Core] Memory leakage with ray.put() #47273

Open BhautikDonga opened 3 weeks ago

BhautikDonga commented 3 weeks ago

What happened + What you expected to happen

I am seeing the memory leakage while putting objects (numpy array) into object store via ray.put().

Versions / Dependencies

I am using ray==2.34.0 and python3.10 with ubuntu20.04.

Reproduction script

import re
import ray
import time
import numpy as np
from ray.util.queue import Queue as rQueue

ray.init(address="auto" if ray.is_initialized() else None)

class Frame:
    def __init__(self, src_id, seq_id=None, ref=None):
        self.src_id = src_id
        self.seq_id = seq_id
        self.ref = ref

@ray.remote
class PacketDecoder:
    def __init__(self, actor_name="packet_decoder") -> None:
        self.actor_name = actor_name
        self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
        self.frame_count = 0

    async def run(self, frame_holder):
        print(f"{self.actor_name} Packet Decoder started.")

        while True:
            try:
                if frame_holder.qsize() > 5:
                    print("Decoder: Frame queue is full .......")
                    await asyncio.sleep(1)
                    continue

                # generate frame
                # bgr_frame = np.random.rand(1920, 1080, 3)
                bgr_frame = np.random.randint(low=0, high=255, size=(1920, 1080, 3), dtype=np.uint8)
                frame_obj = Frame(src_id=self.actor_id, seq_id=self.frame_count, ref=ray.put(bgr_frame))

                # some blocking task
                time.sleep(0.04)

                # Push data to holders
                await frame_holder.put_async(frame_obj, timeout=1)

                self.frame_count += 1
            except Exception as ex:
                print(f"Error: {self.actor_name}: {ex}")

@ray.remote
class ODIActor:
    def __init__(self, actor_name="odi"):
        self.actor_name = actor_name

    async def run(self, frame_holder, src_id, timeout=2):
        while True:
            try:
                try:
                    frame_obj = await frame_holder.get_async(block=True, timeout=timeout)
                except:
                    print(f"[{src_id}]: No frame for last {timeout} seconds.")
                    continue

                ray.internal.free(frame_obj.ref)

            except Exception as ex:
                print(f"[{src_id}]: Error: {ex}.")

def main():

    # initialise camera actors
    cam_prefix = "cam_"
    decode_actors = {}
    for cam in range(7):
        decoder_name = f"decoder_{cam}"
        decode_actors[f"{cam_prefix}{cam}"] = PacketDecoder.options(name=decoder_name, namespace="test", 
                                                    num_cpus=0.1).remote(decoder_name)

    # initialise odi actor
    odi = ODIActor.options(name="odi", namespace="test", num_cpus=0.2).remote("odi")

    # create frame holders for all camera actors
    frame_holders = {
        cam_name : rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"{cam_name}_fh"})
        for cam_name in decode_actors
    }

    # start ODI actors first
    for f_name in frame_holders:
        odi.run.remote(frame_holders[f_name], f_name)

    # start the camera actors
    decoder_refs = []
    for d_name in decode_actors:
        decoder_refs.append(decode_actors[d_name].run.remote(frame_holders[d_name]))

    ray.get(decoder_refs)

    print("------------------------- END -------------------------")

if __name__ == "__main__":
    try:
        main()
    except Exception as ex:
        print(f"Error: {ex}")

Issue Severity

High: It blocks me from completing my task.

BhautikDonga commented 3 weeks ago

Please find the related graphs and extra information from this discussion: https://discuss.ray.io/t/memory-leakage-with-ray-put/15501

982945902 commented 2 weeks ago

if Is it because your producers are producing faster than your consumers are consuming

BhautikDonga commented 2 weeks ago

if Is it because your producers are producing faster than your consumers are consuming

I have tested with check of queue size ( updated reproduction script above ) and the memory leakage is still there.

jjyao commented 1 week ago

Hi @BhautikDonga,

While we investigate why there is memory leak, I'd strongly recommend you to use Ray Data library (given you are doing data processing) which handles things like back-pressure for you and can avoid these memory leaks.

BhautikDonga commented 6 days ago

@jjyao Thanks for looking into this issue. I will go through the Ray Data library and see how I can move to that. It would be great if you can suggest or share some docs for how easily I can move to Ray Data library.

One more thing we have analysed is that, this memory leakage is somehow related to https://github.com/ray-project/ray/issues/47274.

Please let me know, if I can provide any other informations.