ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.44k stars 5.67k forks source link

ray.put() slows down over time. #13612

Open vishal00100 opened 3 years ago

vishal00100 commented 3 years ago

ray.put slows down over time.

I have a simple setup with 2 actors. First Actor places raw and preprocessed images in shared memory and second actor runs predictions on preprocessed images.

import ray
import time
import numpy as np
from tqdm import tqdm
from ray.util import ActorPool
from ray.util.metrics import Count, Histogram, Gauge

CAMERA_COUNT = 3

@ray.remote
class Camera:
    def __init__(self):
        self.camera_acquired_frame_counter = Count(name="camera_acquired_count")
        self.camera_acquire_time = Gauge(name=f'camera_acquire', tag_keys=("frame_order",))
        self.preprocess_time = Gauge(name=f'camera_preprocess', tag_keys=("frame_order",))
        self.ray_frame_set_put_time = Gauge(name=f'camera_frame_ray_put')
        self.ray_preprocessed_frame_set_put_time = Gauge(name=f'camera_preprocessed_ray_put')

    def get(self):
        frame_set_acquire_start_time = time.time()
        camera_count = CAMERA_COUNT
        for cam_idx in range(camera_count):
            frame_acquire_start_time = time.time()
            time.sleep(0.01)
            self.camera_acquire_time.record(time.time() - frame_acquire_start_time, tags={"frame_order": str(cam_idx)})
        self.camera_acquired_frame_counter.record(1.0)
        self.camera_acquire_time.record(time.time() - frame_set_acquire_start_time, tags={"frame_order": f'frame_set'})

        frame_set_preprocess_start_time = time.time()
        for cam_idx in range(camera_count):
            frame_preprocess_start_time = time.time()
            time.sleep(0.01)
            self.preprocess_time.record(time.time() - frame_preprocess_start_time, tags={"frame_order": str(cam_idx)})
        self.camera_acquire_time.record(time.time() - frame_set_preprocess_start_time, tags={"frame_order": f'frame_set'})

        ray_frame_set_put_stime = time.time()
        frame_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        self.ray_frame_set_put_time.record(time.time() - ray_frame_set_put_stime)
        ray_preprocessed_frame_set_put_stime = time.time()
        preprocesed_frame_ref = ray.put(np.zeros((3, 512, 512, 1)))
        self.ray_preprocessed_frame_set_put_time.record(time.time() - ray_preprocessed_frame_set_put_stime)

        return frame_ref, preprocesed_frame_ref

@ray.remote
class Classifier:
    def __init__(self):
        self.classifier_infer_time = Gauge(name=f'classifier_infer')

    def predict(self, preprocesed_frame_ref):
        predict_stime = time.time()
        time.sleep(0.03)
        self.classifier_infer_time.record(time.time() - predict_stime)
        return np.zeros(1000)

if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    camera = Camera.remote()
    classifier = Classifier.remote()
    for ridx in tqdm(range(1000000), desc="Main loop"):
        frame_ref, preprocesed_frame_ref = ray.get(camera.get.remote())
        prediction = ray.get(classifier.predict.remote(preprocesed_frame_ref))

I notice that ray.put() call in Camera#get

frame_ref = ray.put(np.zeros((3, 3000, 5000, 1)))

starts to slow down over time.

screenshot

This trend looks a bit concerning. Any ideas about what’s happening here?

rkooo567 commented 3 years ago

This looks bad. Can you check if this is reproducible when you purely keep calling ray.put?

vishal00100 commented 3 years ago

This looks bad. Can you check if this is reproducible when you purely keep calling ray.put?

Sure. I'll let this code run overnight and post results.

import ray
import numpy as np
from tqdm import tqdm
import time
from ray.util.metrics import Count, Gauge

def run():
    put_count = Count(name="repro_put_count")
    put_gauge = Gauge(name="repro_put_gauge")
    for ridx in tqdm(range(1000000000), desc="Main loop"):
        stime = time.time()
        object_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        put_gauge.record((time.time() - stime) * 1000)
        put_count.record(1.)

if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    run()
vishal00100 commented 3 years ago

purely ray.put code seems fine.

screenshot

rkooo567 commented 3 years ago

Hmm interesting. Can you then try with ray.put in a worker? (same code but you run ray.put inside a actor class and you keep calling it).

vishal00100 commented 3 years ago

Sure.

vishal00100 commented 3 years ago
import ray
import numpy as np
from tqdm import tqdm
import time
from ray.util.metrics import Count, Gauge

@ray.remote
class Actor1:

    def __init__(self):
        self.put_count = Count(name="repro_put_count")
        self.put_gauge = Gauge(name="repro_put_gauge")

    def do_stuff(self):
        stime = time.time()
        object_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        self.put_gauge.record((time.time() - stime) * 1000)
        self.put_count.record(1.)
        return object_ref

def run():
    actor1 = Actor1.remote()
    for ridx in tqdm(range(1000000000), desc="Main loop"):
        object_ref = ray.get(actor1.do_stuff.remote())

if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    run()

I ran this code for ~20 hours and I see similar spikes here.

image

P.S: There are few periods where there are no metrics. That was because the prometheus server was not able to access metrics during that time. However, the code above was running the entire time.

rkooo567 commented 3 years ago

Hmm interesting. We will take a look at what might be the cause soon!

rkooo567 commented 3 years ago

Hey @vishal00100 Thanks again for trying all. I'd like to ask you one last thing. Is it possible to see if this happens in other versions? What versions are you using?

vishal00100 commented 3 years ago

No problem and thanks for quick response. I'm using 1.1.0. Which version would you like me to try?

richardliaw commented 3 years ago

@vishal00100 can you try the nightly wheels https://docs.ray.io/en/master/installation.html?

rkooo567 commented 3 years ago

I'd like to know if this was a regression as well. (Sorry for many request haha..)

Ideally, it'll be great if you can test them with

vishal00100 commented 3 years ago

No worries. I will post results as I gather them.

vishal00100 commented 3 years ago

I ran with nightly wheel for few hours.

>>> ray.__version__
'1.2.0.dev0'

image

It's been running for ~3 hours but I think it's quite clear that this is trending in same direction as version 1.1.0. I'll switch to 1.0.0 and run it again.

vishal00100 commented 3 years ago

@rkooo567 Looks like 1.0.0 doesn't include support for custom metrics. Is there a workaround you can suggest? I can keep track of metrics in my code if necessary.

rkooo567 commented 3 years ago

Ah, yeah. It was introduced after that version. Maybe you should collect manually. Sorry for the inconvenience 😢

rkooo567 commented 3 years ago

@vishal00100 How's this going now? I am thinking to take a look at it in next 2 weeks once you could verify this.

vishal00100 commented 3 years ago

My apologies. I wasn't able to spend time on this sooner.

I modified my code to keep tracking of timing metrics for older versions of ray.

import ray
import numpy as np
from tqdm import tqdm
import time
try:
    from ray.util.metrics import Count, Gauge
except ImportError as e:
    process_start_time = time.time()
    print(f"Count and Gauge are not available in ray version {ray.__version__}.")
    import pandas as pd
    class Gauge:
        def __init__(self, name):
            self.name = name
            self.file_name = f'/tmp/{self.name}.pkl'
            pd.DataFrame([], columns=['name', 'time', 'value']).to_pickle(self.file_name)

        def record(self, value : float):
            # We read DataFrame from file each time so we don't add overhead of maintaining a large DataFrame in memory
            df=pd.read_pickle(self.file_name)
            df.at[len(df), ['name', 'time', 'value']] = [self.name, int(process_start_time - time.time()), value]
            df.to_pickle(self.file_name)

    class Count:
        def __init__(self, name):
            self.name = name
            self.file_name = f'/tmp/{self.name}.pkl'
            pd.DataFrame([], columns=['name', 'time', 'value']).to_pickle(self.file_name)

        def record(self, value : float):
            # We read DataFrame from file each time so we don't add overhead of maintaining a large DataFrame in memory
            df=pd.read_pickle(self.file_name)
            df.at[len(df), ['name', 'time', 'value']] = [self.name, int(process_start_time - time.time()), value]
            df.to_pickle(self.file_name)

@ray.remote
class Actor1:

    def __init__(self):
        self.put_count = Count(name="repro_put_count")
        self.put_gauge = Gauge(name="repro_put_gauge")

    def do_stuff(self):
        stime = time.time()
        object_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        self.put_gauge.record((time.time() - stime) * 1000)
        self.put_count.record(1.)
        return object_ref

def run():
    actor1 = Actor1.remote()
    for ridx in tqdm(range(1000000000), desc="Main loop"):
        object_ref = ray.get(actor1.do_stuff.remote())

if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    run()

image

I'm seeing similar pattern with 1.0.0. I'll run this code again with 0.8.6

ericl commented 3 years ago

I tried running the above script (modified below) on a i3.8xl instance (latest DLAMI, pytorch_p36 python env, Ray nightly) for 24 hours. I did not observe any increase in latency over time (things stayed bounded at ~100ms max). The average rose slightly from 23ms to 27ms, but that seemed pretty negligible to me.

Is it possible the problem is a system-specific issue (for example, running low on memory or some operating system issue?) I'm downgrading the priority since the issue seems to only occur on specific environments.

Note: I also opted for print()ing the times instead of using the metrics system, it's possible the metrics add overheads increasing the latency that would not show up since I used prints()s only.

image

import ray
import numpy as np
from tqdm import tqdm
import time

process_start_time = time.time()
print(f"Count and Gauge are not available in ray version {ray.__version__}.")
import pandas as pd
class Gauge:
    def __init__(self, name):
        self.name = name
        self.file_name = f'/tmp/{self.name}.pkl'
        pd.DataFrame([], columns=['name', 'time', 'value']).to_pickle(self.file_name)

    def record(self, value : float):
        # We read DataFrame from file each time so we don't add overhead of maintaining a large DataFrame in memory
        df=pd.read_pickle(self.file_name)
        df.at[len(df), ['name', 'time', 'value']] = [self.name, int(process_start_time - time.time()), value]
        df.to_pickle(self.file_name)

class Count:
    def __init__(self, name):
        self.name = name
        self.file_name = f'/tmp/{self.name}.pkl'
        pd.DataFrame([], columns=['name', 'time', 'value']).to_pickle(self.file_name)

    def record(self, value : float):
        # We read DataFrame from file each time so we don't add overhead of maintaining a large DataFrame in memory
        df=pd.read_pickle(self.file_name)
        df.at[len(df), ['name', 'time', 'value']] = [self.name, int(process_start_time - time.time()), value]
        df.to_pickle(self.file_name)

@ray.remote
class Actor1:

    def __init__(self):
        self.put_count = Count(name="repro_put_count")
        self.put_gauge = Gauge(name="repro_put_gauge")
        self.i = 0

    def do_stuff(self):
        stime = time.time()
        object_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        self.i += 1
        print(self.i, (time.time() - stime) * 1000)
#        self.put_gauge.record(
#        self.put_count.record(1.)
        return object_ref

def run():
    actor1 = Actor1.remote()
    for ridx in tqdm(range(1000000000), desc="Main loop"):
        object_ref = ray.get(actor1.do_stuff.remote())

if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    run()
thegyro commented 3 years ago

@ericl Is there a reason why we see spikes in the latency of ray.put() call? In the graph you posted above, an operation that typically takes < 25 ms seems to spike to 75, 100, and even 200 ms very frequently. I want to better understand why that happens and how the p99 latency evolves with time.

ericl commented 3 years ago

Just a guess, but it could be something like periodic python GC, or something from the OS.

On Thu, Feb 18, 2021, 8:10 PM Srinath Rajagopalan notifications@github.com wrote:

@ericl https://github.com/ericl Is there a reason why we see spikes in the latency of ray.put() call? In the graph, you posted above, an operation that typically takes < 25 ms seems to spike to 75, 100, and even 200 ms. I want to better understand why that happens.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/13612#issuecomment-781802636, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSVAVOMOKHZ5K2QKNCLS7XQDXANCNFSM4WNPGLOA .