Open mihajenko opened 1 year ago
cc @sihanwang41
To provide custom autoscaling behavior, a conservative refactor of Serve autoscaling would require:
AutoscalingPolicy
interface, making it configurable via AutoscalingConfig
, andDeploymentState
instance with a set of user-configurable (and builtin) metrics, which would be collected, aggregated, and used by the AutoscalingPolicy
implementation instance to produce autoscaling decisions.The implementation sketch below would reuse the formulation of Serve custom metrics, while keeping internal APIs largely intact.
from ray import serve
from ray.util import metrics
import time
import requests
class AutoscalingConfig(BaseModel):
# ... extended with a set of user metrics to collect and an autoscaling policy
metrics: Optional[Set[str]] = None
policy: Optional[AutoscalingPolicy] = None
class AutoscalingPolicy:
@abstractmethod
def get_decision_num_replicas(
self,
curr_target_num_replicas: int,
builtin_metrics: Dict[str, Iterable[Any]],
user_metrics: Optional[Dict[str, Iterable[Any]]] = None
) -> int:
"""Make a decision to scale replicas. Override.
Arguments:
curr_target_num_replicas: The number of replicas that the
deployment is currently trying to scale to.
builtin_metrics: The builtin metrics values that were collected
user_metrics: The user metrics values that were collected
Returns:
int: The new number of replicas to scale to.
"""
return curr_target_num_replicas
class MyAutoscalingPolicy(AutoscalingPolicy):
def get_decision_num_replicas(
self,
curr_target_num_replicas: int,
builtin_metrics: Dict[str, Iterable[Any]],
user_metrics: Optional[Dict[str, Iterable[Any]]] = None
) -> int:
collected_metrics = user_metrics["my_counter"]
avg = sum(collected_metrics) / len(collected_metrics)
return max(curr_target_num_replicas, avg)
autoscaling_config = AutoscalingConfig(
metrics={
"my_counter", # custom metric (see below)
"serve_deployment_processing_latency_ms" # built-in metric
},
policy=MyAutoscalingPolicy()
)
@serve.deployment(autoscaling_config=autoscaling_config)
class MyDeployment:
def __init__(self):
self.num_requests = 0
self.my_counter = metrics.Counter(
"my_counter",
description=("The number of odd-numbered requests to this deployment."),
tag_keys=("deployment",),
)
self.my_counter.set_default_tags({"deployment": "MyDeployment"})
def __call__(self):
self.num_requests += 1
if self.num_requests % 2 == 1:
self.my_counter.inc()
Hi @mihajenko, I really like the proposed interface for the autoscaling, would you like to directly contribute to the ray serve for this feature :) ?
Hi @mihajenko, I really like the proposed interface for the autoscaling, would you like to directly contribute to the ray serve for this feature :) ?
@sihanwang41 Can I pick up this issue and contribute? At my workplace we are in need for custom autoscaling metrics :)
Description
Description
The Serve autoscaler, configurable via
@serve.deployment(autoscaling_config=...)
is extended to be able to produce autoscaling decisions using custom metric sets.Current state
Serve is currently focused on providing basic autoscaling based on HTTP/handle request metrics. Autoscaling decisions are currently made with a fixed (but configurable) algorithm on a limited set of internal metrics: the number of HTTP requests, and the handle queue size.
Use case
Our use case is a deployment DAG with a source node and multiple terminal nodes that do not use a HTTP/handle interface. Such nodes could be persistent consumers (from a Redis or RabbitMQ queue), or producers (e.g. writers to a database).
As such, these nodes do not require a closed circuit starting with a HTTP request resulting in a HTTP response. They actively pull their data, push it through the Deployment graph, and write results using clients at terminal nodes.
But, these source/terminal nodes still require some kind of autoscaling. Their load is not determined by HTTP requests but by the metrics exposed by the external systems the clients are connecting to, or other ray system metrics (e.g. CPU load).