ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.4k stars 5.66k forks source link

[Core] Support TPU pod Autoscaling #38595

Closed allenwang28 closed 1 year ago

allenwang28 commented 1 year ago

Description

https://github.com/ray-project/ray/commit/47edb32eb78cb0b70e3702d0beb1fd75efff92d3 adds in support for TPU pods within the Ray Cluster launcher.

However, for the "full" Ray experience to come to TPU users, I see a few more problems that are probably related: 1) Proper autoscaling, e.g. making a request to the autoscaler and the autoscaler knowing to provision a particular TPU pod 2) Ray task scheduling: all hosts in a TPU pod expect to run the same workload (see Using JAX in multi-host and multi-process environments for more information), else it will not execute properly.

(CC @richardliaw and @architkulkarni)

Without either, then: 1) While we can schedule multiple TPU pods, we can't meaningfully up or downscale which is obviously a problem 2) Unless the user is aware of this or how to work around it, then they will likely experience that Ray functions/actors hang without explanation with a message of "Waiting for all TPU hosts to join the cluster" (or something along those lines)

I can think of a few solutions outlined below:

Option 1: Raylets across multiple hosts

This problem stems from the fact that a single raylet cannot span past a single host. If, hypothetically, a raylet could represent multiple hosts then we could do something like num_tpus=4096 for all of the chips in a TPU pod:

@ray.remote(num_tpus=4096)
def my_function():
  print("This would run on all hosts in a v4-8192")

my_function.remote()

This is easy to conceptualize as a user, and autoscaling would "just work" if all of the other changes that goes into "a raylet can span across a single host" would fix this.

But I'm not sure how feasible this is - to me it seems impossible without a drastic overhaul of Ray itself.

Option 2: The "experimental" approach!

One insight is that we simply need to tell Ray to target a particular group of workers. If, at ray start time, we can label that all of the hosts that comprise a TPU pod are in fact of the same TPU pod, then we can provide the right ray options to target that pod.

This example below does work:

# f:tpu.yaml
worker_start_ray_commands:
  - ray stop
  - "export HOSTNAME_VAL=\"$(hostnamectl | grep 'Static hostname' | awk '{print $3}' | sed 's/-w-.*//')\"; ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 --resources=\"{\\\"$HOSTNAME_VAL-tpu\\\": 1}\""
# f:my_app.py
# Note: This is the user/application code

@ray.remote()
def my_remote_function():
    ...

def run_on_pod(ray_remote, requested_tpu_hosts: int):
      cluster_resources = ray.available_resources()
      available_tpu_resources = dict(filter(
        lambda item: item[0].endswith("-tpu"),
        cluster_resources.items()))
       matching_resources = dict(
          filter(lambda item: item[1] == requested_tpu_hosts,
                  available_tpu_resources.items()))

      if not matching_resources:
          ray.autoscaler.sdk.request_resources(...)
       else:
           tpu_id = list(matching_resources.keys())[0]
           return [
               ray_remote.options(resources={tpu_id: 1}).remote() for _ in range(requested_tpu_hosts)
           ]

handles = run_on_pod(my_remote_function, requested_tpu_hosts=4)

In plain English, we:

But I think the pitfalls here are obvious - it's hard to read, hard to maintain and IMHO not a feasible long term answer. Plus, it isn't clear how the resource request should be made.

Option 3: Placement groups [recommended]

From my understanding, the best way to approach this would be to use placement groups. My current line of thinking (which I hope to refine through conversations in this thread) is as follows: 1) Introduce a way to specify in ray start to hint to Ray that multiple hosts should be scheduled together if requested. In option 2, we put in the VM instance name as a custom resource, but IMO that is a bit of an abuse of the custom resource. Maybe we can introduce a concept of "raylet metadata" such that it can hint to Raylet for: 2) Introduce a placement group strategy akin to Node affinity in the K8s world. Assuming at ray start time we've successfully hinted to Ray which VMs should have an affinity, then a user can do something like:

@ray.remote
def my_function():
  ...

# E.g. a v3-32:
num_tpu_hosts = 4
pg = placement_group([{"TPU": 4} for _ in range(num_tpu_hosts)], strategy="TPU_AFFINITY")

my_function.options(
    scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)).remote()

I like this approach because: 1) It isn't as magical as "change everything about Ray" (like Option 1) 2) It is an incremental feature above placement groups, which AFAICT aims to solve problems like this 3) Is more manageable from a user perspective than option 2 4) Should allow us to insert the TPU specific logic within Ray to hide the complexities from the user. For instance, in the autoscaling logic we can use the placement group to discover the mapping between the number of TPUs requested and cross-check with the number of TPU chips that are available node configurations.

Use case

As a user, imagine I have the following functions that I want to run on TPU pods:

# I expect train_a to use a v4-16
@ray.remote(...)
def func_a():
  ...

# I expect train_b to use a v3-32
@ray.remote(...)
def func_b():
  ...

I would expect to be able to call both func_a and func_b as many times as I want, without having to: 1) Think about which machines the functions get placed on 2) Think too much about how the autoscaling should trigger

architkulkarni commented 1 year ago

Option 3 sounds the best to me as well, it sounds like the standard solution for this type of workload. cc'ing more people who might have relevant knowledge, since I'm not the most familiar with placement groups and how they're currently used for Ray Train: @jjyao @matthewdeng

allenwang28 commented 1 year ago

Great points @architkulkarni!

Per your last point:

Is the mental model of the user more like "I want to run this on a v4-16 and I don't want to think about the exact number of hosts, chips, etc", or more like "I want to run this on 64 hosts and I don't want to look up the TPU type"?

My mental model of this is actually "I want to run on a v4-16 but I don't have a way to tell Ray which specific machines to run on because there is a relationship between particular raylets."

Can you walk through the example ...

Regarding the example, sorry - I picked an unfortunate TPU configuration lol, let's use a v4-16 as an example:

@ray.remote
def my_function():
  ...

# E.g. a v4-16:
num_tpu_hosts = 2
pg = placement_group([{"TPU": 4} for _ in range(num_tpu_hosts)], strategy="TPU_AFFINITY")

my_function.options(
    scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=pg)).remote()

so a v4-16 has two hosts (num_tpu_hosts = 2) and each host has 4 TPU chips ({"TPU": 4}).

For example it might make sense to push down some of the logic about [{"TPU": 4} for _ in range(num_tpu_hosts)], strategy="TPU_AFFINITY" into the Ray code itself, so the user-facing API would just be something like f.options( scheduling_strategy=TPUSchedulingStrategy(ray.tpu.V4_16)).remote(). But maybe this is too TPU-specific to include in Ray, and instead it should be done at the level of ML libraries on top of Ray.

My hunch is that I agree with you - pushing pg = placement_group([{"TPU": 4} for _ in range(num_tpu_hosts)], strategy="TPU_AFFINITY") is probably too TPU specific for Ray Core and a better fit for one of the higher level AIR libraries (at least for now).

Do you have a proposal for the API for this part? ray start sounds like the right place to do it.

Yes, will follow up in the next comment.

allenwang28 commented 1 year ago

Do you have a proposal for the API for this part? ray start sounds like the right place to do it.

Yeah, so I think we should probably be very generic and not TPU specific which I have ideas for:

At a high level, Ray currently offers users a way to provide 1) the resource type and 2) the number of resources. The limitation we're touching on is that now need to introduce a new schema for users to tell Ray 3) how those resources are linked.

For instance:

As a starting point (and drawing inspiration from resource representations in Ray) a V0 of an extensible API we can iterate on would be to introduce a new (optional) logical field in the GCS that represents these relationships, perhaps like this (note I'm using Python dataclasses for illustrations but it probably could/should be represented by protobufs):

@dataclasses.dataclass
class GroupingType(Enum):
    name: str
    priority: int

@dataclasses.dataclass
class Grouping:
    id: str
    type: GroupingType

# Predefined types
TPU_ICI = GroupingType('TPU_ICI', 1)
ZONE_NETWORK = GroupingType('ZONE_NETWORK', 2)
REGION_NETWORK = GroupingType('REGION_NETWORK', 3)

where imagine TPU VMs within a TPU pod could be represented by:

[Grouping(id=tpu_pod_name, type=GroupingType.TPU_ICI),
 Grouping(id='us-central2-b', GroupingType.ZONE_NETWORK),
 Grouping(id='us-central', GroupingType.REGION_NETWORK)]

With this interface:

Then imagine we have 3 TPU pod slices, each with two workers. 2 of the TPU pods are in the same data center, and one of the TPU pods is in a different data center. GCS could ultimately have access to a view like this:

{
    TPU_ICI: { 
        "TPU-POD-1": ["10.139.0.1", "10.139.0.2"],
        "TPU-POD-2": ["10.140.0.1", "10.140.0.2"],
        "TPU-POD-3": ["10.141.0.1", "10.141.0.2"],
    },
    ZONE_NETWORK: {
        "us-central2-b": ["10.139.0.1", "10.139.0.2", "10.140.0.1", "10.140.0.2"],
        "us-west4-a": ["10.141.0.1", "10.141.0.2"],
    },
    REGION_NETWORK: {
        "us-central2": ["10.139.0.1", "10.139.0.2", "10.140.0.1", "10.140.0.2"],
        "us-west4": ["10.141.0.1", "10.141.0.2"],
    },
}

This level of view could help us define a new placement group strategy that leverages this grouping information, e.g. possibly:

PlacementGroup([{"TPU": 4}, {"TPU": 4}], strategy=GroupingStrategy(level=GroupingStrategies.TPU_ICI))

which would tell Ray that I want to run on 2 nodes each with 4 TPU chips that are linked by ICI.

(I'm not sure if this makes sense from the placement group strategy as I have no idea how it works!)

allenwang28 commented 1 year ago

Met with @richardliaw at Ray Summit regarding multi host serving, summarizing what we discussed. Also Richard asked me to open this as a separate issue and tag @jjyao and @scv119 for feedback and/or help identify any gaps!

Problem Statement

Our primary use case is multi host serving, e.g. when a replica in our deployment is a TPU pod slice. This pod slice consists of multiple ray worker nodes that have TPU VMs and are provisioned as an atomic unit.

Primary use case is production-grade serving of large models that do not fit within a single host TPU VM (estimated >65B parameters from here).

Note that while this primarily targets TPU pod slices, this would also affect other accelerator types (like Gaudi and GPU VM pods connected by higher bandwidth network) that requires a notion of grouping ray workers together in a deployment.

Visualization

To demonstrate this concept, here is a snippet of code of what a sample deployment could look like using today's Ray APIs:

@serve.deployment(num_replicas=1, route_prefix="/")
class APIIngress:
    def __init__(self, handle):
        self._handle = handle

    async def __call__(self, request):
        return await self._handle.generate.remote(request)

@serve.deployment(autoscaling_config={
    "min_replicas": 1,
    "max_replicas": 256,
  })
class LLMServer:
    # E.g. represents a v5e-16 and we would want up to 256 replicas of this unit
    def __init__(self):
        # v5e-16 is composed of 4-chip VMs...
        num_hosts = 4
        self._shards = [LLMShard(...) for _ in range(num_hosts)]

    async def generate(self, request):
        response_shards = ray.get([s.generate(request) for s in self._shards])
        # join and postprocess results, e.g. beamsearch etc.
        return processed_result

@ray.remote(resources={"TPU": 4})
class LLMShard:
    # E.g. represents a 4-chip v5e VM host
    def __init__(self):
        import jax
        self._model_shard = # load checkpoint...

    def generate(self, request):
        return self._model_shard.generate(request)

As an image:

ray-multihost-serving-deployment drawio

Issues

Autoscaling code snippet above will not work because of two problems: 1) How does Ray know "how" to autoscale?

And a minor usability concern (3): if our LLMShard consisted of multiple functionalities past generate, we might have to duplicate function definitions for each function call.

Proposal

A better version would be something like this (same as above, skipping the APIIngress):

Serve API

@serve.deployment(tpu="tpu-v5e-16") # or tpu=("v5e", "4x4") using the accelerator config definition
class LLMServer:
    def __init__(self):
        # e.g. per-shard init logic
        self._model_shard = ... # checkpoint load

    def generate(self, request):
        self._model_shard.generate(request)

    @serve.group_handler
    def group_generate_handler(self, request):
        results = await self.generate(request)
        # join and postprocess results, e.g. beamsearch etc.

e.g. we introduce: 1) deployment of a TPU type in which we can specify the exact topology we want a deployment replica to represent 2) a group_handler decorator - similar to @serve.batch but instead this allows us to define our joining and postprocessing strategy within the same deployment. This will help mitigate issue (3)

Ray Core - Placement Groups

Serve API could lower the tpu="tpu-v5e-16" portion to a placement group. We would need to modify the placement group to accept something like this:

bundles = [{"TPU_V5E": 4}] * 4
placement_group(bundles, grouped=True)

E.g. the bundles will consist of 4 Ray worker nodes each with 4 v5e chips, and the placement group specifies that they should be "grouped."

This way, the autoscaler can deduce from this placement group spec that this should correspond to a TPU pod of 4 v5e-4s, resolving problem (1).

This leaves issue (2), e.g. how do we ensure that the shards reach the right ray workers? While the placement group will hint to Ray Core that we have to take special consideration about how Raylets schedule this task or actor, we still need Ray to know about the relationship between TPU VM hosts if they are part of the same TPU pod slice. We have to manage this at the resource provisioning level.

Ray Cluster Launcher

For both the VM cluster launcher and KubeRay, we would need to modify the ray start command so we can specify the grouping relationship. For instance, that could look like this:

ray start --resources={"TPU": 4} --group_id=$TPU_POD_SLICE_ID

Fortunately, TPU VM pod slices are created with a single unique ID naming the pod slice, and we should also be able to specify a unique identifier in the ray start command in KubeRay/GKE.

allenwang28 commented 1 year ago

Closing in favor of https://github.com/ray-project/ray/issues/39781 because this thread has a lot of redundant information