vllm-project / vllm

A high-throughput and memory-efficient inference and serving engine for LLMs
https://docs.vllm.ai
Apache License 2.0
31.07k stars 4.72k forks source link

[RFC]: Interface and Abstraction for Distributed Inference Environment #3587

Closed youkaichao closed 5 months ago

youkaichao commented 8 months ago

This RFC describes a proposal for interfaces and abstractions for distributed inference environments. I plan to solicit discussions for a week (until March 31st) before I begin to actually refactor the code.

Motivation

The current distributed inference environment in vllm is quite tangled, and we often see deadlocks and hangs (see https://github.com/vllm-project/vllm/issues/3455 , https://github.com/vllm-project/vllm/issues/2770 , https://github.com/vllm-project/vllm/issues/3559 , to name a few). The problem becomes prominent when we try to upgrade to pytorch 2.2.0 (see https://github.com/vllm-project/vllm/pull/3442 , https://github.com/vllm-project/vllm/pull/3442 ), because pytorch 2.2.0 upgrades from nccl==2.18.1 to 2.19.3 (see https://pypi.org/pypi/torch/2.1.2/json and https://pypi.org/pypi/torch/2.2.0/json to compare the dependency), and nccl==2.19.3 breaks vllm due to increased memory cost during cudagraph capture (from 10MB per graph to 100MB per graph, adds up to several GBs because we have dozens of cudagraph).

TL,DR; distributed inference in current codebase is a headache. If it works, hooray; if not, don't be surprised.

Proposal

Abstraction

I think we should have three levels of abstraction:

  1. Launcher, responsible for launching processes (potentially across multi-node). Currently it is ray, but we can also have another choices like Python's native multiprocessing in single-node cases. See https://github.com/vllm-project/vllm/pull/3466 for example.
  2. Coordinator, responsible for coordinating shared resources (e.g. filesystem usage) and broadcasting some messages. Currently we don't have this, and there are lots of hacks for ad-hoc implementation, e.g. use filelock to lock on filesystems ( https://github.com/vllm-project/vllm/pull/3578 ), use TCP to initialize communication in cupy ( https://github.com/vllm-project/vllm/pull/2811 ), use MPI to initialize communication in AMD's cupy version ( https://github.com/vllm-project/vllm/pull/3123 ).
  3. Communicator, responsible for cross-device communication of large tensor data (e.g. perform allreduce). Currently we support nccl, and AMD also has its own communication library. Note that this is vendor-specific, and vendors usually have their own way of cross-device communication.

The most messy one, and the missing one, is the Coordinator abstraction level. More on this later.

Interface

Between each consecutive abstractions, lies the interface.

Interface between Launcher and Coordinator

After Launcher launches processes, it needs to at least tell the processes the following information:

How does Launcher pass these information to each process? Basically we have two choices:

  1. through environment variables, the simplest way, but will disable the usage of thread-level distributed inference because environment variables are shared within threads in one process. (However, thread-level distributed inference seems rare. Do we need to consider this?)
  2. through serialization and deserialization (e.g. passing bytes in a shared object store), the most general way, at the cost of complexity and runtime efficiency to design and execute the serialization/deserialization

Interface between Coordinator and Communicator

Device communicators (e.g. nccl) often need to initialize the communication by sharing some unique token (see nccl documentation). In addition, processes sometimes need to coordinate the resource in a node or across the cluster.

In sight of the above consideration, Coordinator should at least have the following interfaces:

  1. is_master(): tell if the current process is a master process, i.e. convenient wrapper for boilerplate code rank == 0
  2. is_local_master(): tell if the current process is a local master process, i.e. convenient wrapper for boilerplate code local_rank == 0
  3. broadcast(bytes, src): broadcast some message (in the form of bytes) from rank src to all the processes. The semantic is standard, no need for more explanation.
  4. barrier(): block until all processes reaches here. Also standard communication primitive.

Note: very often than not, we want to execute something in just one process per node (e.g. creating directories, downloading files to the node). Inspired by this thread, we can write code like this:

if is_local_master():
    do_something() # download file, create directory, etc.
barrier()

Furthermore, there are more complicated requirements like "only one process in each node does something, but this something is different across nodes", essentially the requirement of local_barrier(), a function that block until all processes in the current node reaches here. It is debatable if we want this (currently I don't see any requirements like this in vllm.)

Communicator interface

The following functionality of communicator is suggested (mostly taken from the nccl design):

  1. the master process get unique token to identify the communication group
  2. the master process broadcast unique token to all ranks
  3. each process initializes communication by the unique token and their rank, world_size
  4. an in-place allreduce function: allreduce(char* input, size_t count, size_t dtype, size_t op). More functionality would be better (e.g. out-of-place allreduce, broadcast/reduce/scatter etc.), but inplace allreduce is all we need currently.

The intended usage would be something like this:

# inside each process
coor = Coordinator(); # initialize Coordinator, done by vllm
comm = Communicator(coor) # hardware vendor can use `coor` to initialize their communicator
data = torch.tensor((1024, 1024)).to(device=f"xpu:{coor.local_rank}")
comm.allreduce(data) # hardware vendor can access the raw data via pytorch's [`Tensor.data_ptr`](https://pytorch.org/docs/stable/generated/torch.Tensor.data_ptr.html) mechanism.
# please implement Communicator.__del__ to destroy communicator, so that programs can exit gracefully 

A reference implementation of Coordinator

A reference implementation of Coordinator can be torch.distributed, with the gloo backend designed to communicate CPU tensors.

Other considerations include MPI and custom-implemented TCP store. However, since we live in torch framework, torch.distributed is a natural choice without any new dependency.

Note: torch.distributed can also be used as a fully functional communicator for GPU devices. However, torch.distributed.all_reduce is way more complicated than just an allreduce operation. It might initialize autograd engine, might keep track of gradients, might dispatch to different device kernels. Even if we are in torch.inference_mode, its c10 engine might perform some additional operations that fails functionalities like cudagraph. Therefore, I prefer to call vendor-provided communication libraries directly to bypass the problem. After all, we just want an allreduce operation on dense tensors, without any hustle and bustle.

Benefits

After we have the above abstraction and interface, we can have the following benefits:

Things not to be considered

We don't aim for a fully-fledged distributed execution environment. And since inference tasks are almost stateless, we don't need to consider elasticness and fault-tolerance. As opposed to training, we don't need to save checkpoints, we don't need to resume from previous failure ...

rkooo567 commented 8 months ago

Thanks for the RFC! I have several questions.

  1. Should we consider the requirements from more advanced communication pattern, e.g., https://arxiv.org/abs/2401.09670 (prefill disaggregation)? The major requirement would be that this will require to establish communicator between two groups of workers, meaning there could be more than 1 communicator group (for all reduce among tp workers, and send/recv among different group of workers).
  2. When we initialize the engine, we initialize states for all workers by calling APIs such as calling init_model, init_communicator to all workers. Is this going to be a part of coordinator APIs? or launcher?
  3. It is a little confusing to me the coordinator is passed into communicator because coordinator has "broadcast" API which is typically implemented by the communicator API. Is broadcast implementation agonistic to communicator in general?
youkaichao commented 8 months ago

Should we consider the requirements from more advanced communication pattern

It is possible, and it won't change the interface in this RFC. The burden would go to DistributedWorker (described below), on how it creates communicators using coordinators.

Is init_model, init_communicator going to be a part of coordinator APIs? or launcher?

I think it should go to the API of a distributed worker, an abstraction coming with launcher. Each vLLM engine instance will have one launcher, who launches a set of DistributedWorker, and then the launcher calls init_model on each DistributedWorker.

A default implementation of DistributedWorker can be:

class DistributedWorker:
    def __init__(self, args):
        self.coor = Coordinator(); # initialize Coordinator, done by vllm
        self.comm = Communicator(self.coor) # hardware vendor can use `coor` to initialize their communicator

    def init_model(self, args):
        pass

    def run_model(self, args):
        pass

It is a little confusing to me the coordinator is passed into communicator because coordinator has "broadcast" API which is typically implemented by the communicator API.

The emphasis of this RFC is to disentangle control-plane communication (Coordinator) and data-plane communication (Communicator). Both of them can have a broadcast operation, but Coordinator.broadcast (required) is designed to broadcast tiny control messages (e.g. 128 bytes), while Communicator.broadcast (optional) is designed to broadcast large chunks of data (e.g. 100MB tensors).

Communicators typically need control-plane communication to set up the state before large chunk of data communication, e.g. in the design of nccl, before we can perform allreduce on tensor, we need to broadcast a unique id using CPUs so that communicators know they are in a group.

WoosukKwon commented 8 months ago

This is awesome!

rkooo567 commented 8 months ago

The emphasis of this RFC is to disentangle control-plane communication (Coordinator) and data-plane communication (Communicator). Both of them can have a broadcast operation, but Coordinator.broadcast (required) is designed to broadcast tiny control messages (e.g. 128 bytes), while Communicator.broadcast (optional) is designed to broadcast large chunks of data (e.g. 100MB tensors).

In this case, do we decouple communicator implementation from coordinator broadcast implementation? For example, let's say we have a new communicator implementation called Gccl, GcclCommunicator(). In this case, Coordinator's broadcast implementation still uses the default one, or Gccl under the hood?

I am worried it is a leak abstraction if coordinator should use communicator's broadcast implementation under the hood. In that case, it may make more sense the coordinator accepts the communicator as an input?

class DistributedWorker:
    def __init__(self, args):
        self.comm = Communicator() # hardware vendor can use `coor` to initialize their communicator
        self.coor = Coordinator(self.comm); # initialize Coordinator, done by vllm

class Coordinator:
    def broadcast(self):
        self.comm.broadcast(...)
youkaichao commented 8 months ago

do we decouple communicator implementation from coordinator broadcast implementation?

Yes, they are decoupled. The reference implementation of coordinator uses pytorch's gloo backend, which is a CPU backend and is always available. If we have a new communicator GcclCommunicator, Coordinator's broadcast implementation still uses the default one, not the GcclCommunicator one.

The dependency chain is: coordinator has its own broadcast mechanism, either via MPI or TCPStore or anything else, to broadcast tiny messages. Communicator depends on coordinator's broadcast operation to finish initialization, and then Communicator can do all_reduce on large tensor data.

rkooo567 commented 8 months ago

Gotcha, that makes sense! Thanks for the clarification!

WoosukKwon commented 8 months ago

QQ: How does the custom all reduce backend fit into this abstraction?

youkaichao commented 8 months ago

QQ: How does the custom all reduce backend fit into this abstraction?

I'm not familiar with custom allreduce kernel. At the first glance, I feel it can be another implementation of Communicator, beside the nccl one.

youkaichao commented 7 months ago

Progress tracker:

youkaichao commented 7 months ago

Summary of all the distributed inference cases we need to support: a cartesian product of single/multi vllm engine, single/multi node.

Precondition: each vllm engine will require at least one GPUs. GPUs cannot be shared across vllm engines.

Here are all the possible combinations:

Single vllm Engine, Single Node

Users can use CUDA_VISIBLE_DEVICES to control the set of GPUs used for the vLLM instance. e.g. CUDA_VISIBLE_DEVICES=0,1,2,3 python -m vllm.entrypoints.openai.api_server args

Single vllm Engine, Multiple Nodes

Currently only support ray-managed cluster. Only node need to execute this:

python -m vllm.entrypoints.openai.api_server args

Then ray will use all nodes in the cluster.

In the future we may need to support torchrun/mpi style launcher, as described in https://github.com/vllm-project/vllm/issues/3902#issuecomment-2045565458 .

# multi node, on node 0
CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nnodes 2 --nproc-per-node=n --rdzv_backend=c10d --rdzv_endpoint=${node_0_ip:port} python -m vllm.entrypoints.openai.api_server $args
# multi node, on node 1
CUDA_VISIBLE_DEVICES=1,2,3,4 torchrun --nnodes 2 --nproc-per-node=n --rdzv_backend=c10d --rdzv_endpoint=${node_0_ip:port} python -m vllm.entrypoints.openai.api_server $args

Multiple vllm Engines, Single Node

Users can use CUDA_VISIBLE_DEVICES to control the set of GPUs used for each vLLM instance. e.g.

first engine:

CUDA_VISIBLE_DEVICES=0,1,2,3 python -m vllm.entrypoints.openai.api_server args

second engine:

CUDA_VISIBLE_DEVICES=4,5,6,7 python -m vllm.entrypoints.openai.api_server args

Multiple vllm Engines, Multiple Nodes

In this case, each vllm engine should have its own nodes. No overlap is allowed. We don't need to do anything in this case.

Conclusion

Ideally, we should respect CUDA_VISIBLE_DEVICES set by users, in all cases.

Jeffwan commented 7 months ago

Communicator, responsible for cross-device communication of large tensor data (e.g. perform allreduce). Currently we support nccl, and AMD also has its own communication library. Note that this is vendor-specific, and vendors usually have their own way of cross-device communication. The most messy one, and the missing one, is the Coordinator abstraction level. More on this later.

I think some companies are building their own XXCCL solution to

  1. optimize the cross-node performance, discover network topology and optimize communication etc based on their own data center settings.
  2. abstract the heterogeneous devices and provide a hardware abstraction layer (HAL) (term is from https://arxiv.org/abs/2202.07848 paper).

While this is separate from the vllm engine itself, I am thinking maybe it's worth to extend to this area or use existing library to simplify the work if there's any.

Jeffwan commented 7 months ago

through environment variables, the simplest way, but will disable the usage of thread-level distributed inference because environment variables are shared within threads in one process. (However, thread-level distributed inference seems rare. Do we need to consider this?)

I think the only use case here is the single node multiple GPU case. In this case, could different threads just set device in this way torch.cuda.set_device(device_id)? They share the envs but but in non-distributed env, rank information should not be injected from external. Instead, launcher just need to manage the workers and they should have 1:1 mapping with CUDA_VISIBLE_DEVICES

Jeffwan commented 7 months ago

launch_id, used to distinguish current launch with possibly concurrent launch (e.g. when 4 users want to set up 4 inference engines in the same node, each with 2 GPUs). Note: the launch_id can be used as a "random seed" to draw values for master_port, instead of keeping only one default master_port value and having to kill all processes after the last run crashes. A reference implementation would be hashing the launch_id to a port number, and increasing the port number to find the first free port. This is a strategy taken by Jupyter Notebook/Lab Server .

I didn't get this part.

  1. In order to distinguish different engines (4 engines with 2 gpus each one one node), the new instance should always picks up the random ports. Is master_port enough?

  2. If we do like some id, why not hashing to random port number to launch_id but do a reverse way?

youkaichao commented 7 months ago

launch_id can even go beyond distributed training, e.g. when we plan to put verbose logging in some folder, as partly done in https://github.com/vllm-project/vllm/pull/4079 .

Jeffwan commented 7 months ago

might perform some additional operations that fails functionalities like cudagraph.

I am thinking whether the motivation is strong enough. Seems two pain points are 1. torch.distributed.all_reduce is not that clean, it doesn't work with cudagraph 2. heterogeneous devices are hard to maintain.

  1. for the 1st one, I feel the refactor cost is a little bit higher for one feature with side effect (taking more memories).
  2. for the 2nd one, should we expect different vendor support their communicative backend at torch.distrbuted level?

from the architecture level, split communication collective usage into control plane and data plane totally makes sense. I am just thinking whether it's possible to reuse one progress group which is cleaner but address issues you listed.

youkaichao commented 7 months ago

First of all, thank you for your interest and feedback!

whether it's possible to reuse one progress group which is cleaner

Process group is just a way to organize processes, it does not create new processes. For tensor parallel of size k, we can have k processes, and they can form multiple process groups. That said, creating process group is quite cheap. I don't get it why you think we should stick to one group.

should we expect different vendor support their communicative backend at torch.distrbuted level

That could be the case, but the progress can be really slow. e.g. we have to wait for at least several months for pytorch release. Meanwhile, writing wrappers in vllm is much faster to implement, e.g. we can do it in days.

Jeffwan commented 7 months ago

@youkaichao

Process group is just a way to organize processes, it does not create new processes. For tensor parallel of size k, we can have k processes, and they can form multiple process groups. That said, creating process group is quite cheap. I don't get it why you think we should stick to one group.

Yeah, the communication thread it is cheap. I was thinking about the debuggability and ports etc. (we have limited ports can be exposed in our env). But anyway, consider just two groups needed here, I think my concern was unnecessary. I am buying into the idea.

youkaichao commented 5 months ago

Finished in https://github.com/vllm-project/vllm/pull/5293