ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.89k stars 5.76k forks source link

[Serve][High] How to properly configure the load on replicas? #44872

Open psydok opened 6 months ago

psydok commented 6 months ago

What happened + What you expected to happen

I have 2 deployments (grpc): 1 processes audio (very fast, up to 10-100ms, I don't know exactly), then sends request to inference (300-700ms). I have deployed 12 replicas. I can see from graphs in grafana that qps=5-6.

But I still get relatively many timeouts, compared to if I separately deployed 5 replicas and linked them by nginx. And in 95-percentile, requests often take up to 5 seconds to complete for some reason.

I don't understand at all how I should set the load. How do I tell the robin-round replicas to accept requests if not taking out - scaling?

Currently I have 4 servers connected, each with 2-3 gpu of different size (8-11gb).

This problem blocks all the scaling profits because the replicas end up being slow 10% of time. A lot of timeouts drop at a moment in time while the load on the service does not grow. I've tried going through different hyperparameters. Nothing works properly.

Same problem is with second application that we are trying to deploy via ray. The Yolo model converted to onnx does not produce RPS greater than 18, starting with 3 replicas, and the 95-percentile response rate is always 2 times higher under load.

Versions / Dependencies

ray==2.10.0 python==3.11.8

Reproduction script


import os
import subprocess as sp
import time

import ray

GPU_IDX = int
Mb = int

def get_free_gpu_memory() -> list[Mb]:
    command = "nvidia-smi --query-gpu=memory.free --format=csv"
    memory_free_info = sp.check_output(command.split()).decode("ascii").split("\n")[:-1][1:]
    memory_free_values = [int(x.split()[0]) for x in memory_free_info]
    return memory_free_values

def get_gpu_id(gpu_memory_gb: float) -> GPU_IDX:
    gpu_idx = ray.get_gpu_ids()[-1]
    time.sleep(time.perf_counter_ns() % 60)
    free_memory_mb = get_free_gpu_memory()
    gpu_memory_mb = gpu_memory_gb * 1000
    if gpu_memory_mb < free_memory_mb[gpu_idx]:
        os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_idx)
        return gpu_idx

    for idx, memory_mb in enumerate(free_memory_mb):
        if gpu_memory_mb < memory_mb:
            os.environ["CUDA_VISIBLE_DEVICES"] = str(idx)
            return idx

    raise RuntimeError("There is not enough free gpu memory on node.")

@serve.deployment(
    ray_actor_options={"num_cpus": 1, "num_gpus": 0.21, "resources": {"GPUMemory": 3.3}},
    max_ongoing_requests=3,
    autoscaling_config={
        "min_replicas": 12,
        "max_replicas": 16,
        "upscale_delay_s": 3,
        "target_ongoing_requests": 1,
        "downscale_delay_s": 5 * 60 * 60,
        "downscaling_factor": 0.3,
        "metrics_interval_s": 3,
        "look_back_period_s": 10,
    },
    max_replicas_per_node=4,
)
class SynchronousSTT:
    def __init__(self):
        devices = [get_gpu_id(3.3)]
        try:
            self._model = Model(
                num_workers=2,
                devices=devices,
            )
        except Exception as exp:
            _logger.warning(
                f"Error message: {str(exp)}\n" "An error was received when initializing the model."
            )
            raise

    async def __call__(self, samples: list[np.ndarray]) -> stt_pb2.RecognizeResponse:
        # 300-700ms
        result = self._model(samples[0])
        return stt_pb2.RecognizeResponse(
            result=stt_pb2.RecognizeResponse.Result(words=result.words, text=result.text),
        )

@serve.deployment
class SpeechToTextService:
    def __init__(self, recognition: DeploymentHandle):
        self._recognition = recognition.options(use_new_handle_api=True)
        self._data_preprocessor = DataPreprocessor(
            16000, 1
        )

    async def Recognize(self, request: stt_pb2.RecognizeRequest) -> stt_pb2.RecognizeResponse:
        raw_audio = request.audio.audio_content
        if not raw_audio:
            raise ZeroSignalError()
        preprocessed_samples = self._data_preprocessor(
            AudioData(
                raw_audio=raw_audio,
                sample_rate_hertz=int(request.recognition_config.sample_rate_hertz),
                channels=int(request.recognition_config.channels),
            )
        )
        return await self._recognition.remote(preprocessed_samples)

app = SpeechToTextService.options(
    ray_actor_options={"num_cpus": 0.2},
    max_replicas_per_node=1,
    max_ongoing_requests=100,
    autoscaling_config={
        "target_ongoing_requests": 10,
        "min_replicas": 3,
        "max_replicas": 5,
        "upscale_delay_s": 3,
        "downscale_delay_s": 60,
        "downscaling_factor": 0.3,
        "metrics_interval_s": 2,
        "look_pack_period_s": 10,
    },
).bind(SynchronousSTT.bind())

Issue Severity

High: It blocks me from completing my task.

anyscalesam commented 6 months ago

can you take a look at this @edoakes > also @psydok it might be faster to get an answer to this on #serve in ray slack; can you post there and we can continue the discussion here?

quicker to resolve for you; at first skim this reads like a serve config thing that can help improve your qps but i defer to Ed

psydok commented 1 month ago

We tried different configurations of replica load - nothing helps. still the service works faster if separate instances are started and combined in nginx with robin round balancing. can you please tell me how soon we can expect to be able to configure robin round in ray serve?

https://github.com/ray-project/ray/issues/41555