vllm-project / vllm

A high-throughput and memory-efficient inference and serving engine for LLMs
https://docs.vllm.ai
Apache License 2.0
26.12k stars 3.83k forks source link

[Bug]: `RuntimeError: b_q_weight is not on GPU` CPU Offloading #6952

Closed justinthelaw closed 1 month ago

justinthelaw commented 1 month ago

Your current environment

The output of `python collect_env.py`

Collecting environment information...
PyTorch version: N/A
Is debug build: N/A
CUDA used to build PyTorch: N/A
ROCM used to build PyTorch: N/A

OS: Ubuntu 22.04.4 LTS (x86_64)
GCC version: (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0
Clang version: Could not collect
CMake version: version 3.22.1
Libc version: glibc-2.35

Python version: 3.11.6 (main, Jul 17 2024, 12:33:27) [GCC 11.4.0] (64-bit runtime)
Python platform: Linux-6.5.0-45-generic-x86_64-with-glibc2.35
Is CUDA available: N/A
CUDA runtime version: Could not collect
CUDA_MODULE_LOADING set to: N/A
GPU models and configuration: GPU 0: NVIDIA GeForce RTX 4070 Laptop GPU
Nvidia driver version: 550.90.07
cuDNN version: Could not collect
HIP runtime version: N/A
MIOpen runtime version: N/A
Is XNNPACK available: N/A

CPU:
Architecture:                       x86_64
CPU op-mode(s):                     32-bit, 64-bit
Address sizes:                      39 bits physical, 48 bits virtual
Byte Order:                         Little Endian
CPU(s):                             32
On-line CPU(s) list:                0-31
Vendor ID:                          GenuineIntel
Model name:                         13th Gen Intel(R) Core(TM) i9-13900HX
CPU family:                         6
Model:                              183
Thread(s) per core:                 2
Core(s) per socket:                 24
Socket(s):                          1
Stepping:                           1
CPU max MHz:                        5400.0000
CPU min MHz:                        800.0000
BogoMIPS:                           4838.40
Flags:                              fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq dtes64 monitor ds_cpl vmx est tm2 ssse3 sdbg fma cx16 xtpr pdcm sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb ssbd ibrs ibpb stibp ibrs_enhanced tpr_shadow flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid rdseed adx smap clflushopt clwb intel_pt sha_ni xsaveopt xsavec xgetbv1 xsaves split_lock_detect avx_vnni dtherm ida arat pln pts hwp hwp_notify hwp_act_window hwp_epp hwp_pkg_req hfi vnmi umip pku ospke waitpkg gfni vaes vpclmulqdq rdpid movdiri movdir64b fsrm md_clear serialize arch_lbr ibt flush_l1d arch_capabilities
Virtualization:                     VT-x
L1d cache:                          896 KiB (24 instances)
L1i cache:                          1.3 MiB (24 instances)
L2 cache:                           32 MiB (12 instances)
L3 cache:                           36 MiB (1 instance)
NUMA node(s):                       1
NUMA node0 CPU(s):                  0-31
Vulnerability Gather data sampling: Not affected
Vulnerability Itlb multihit:        Not affected
Vulnerability L1tf:                 Not affected
Vulnerability Mds:                  Not affected
Vulnerability Meltdown:             Not affected
Vulnerability Mmio stale data:      Not affected
Vulnerability Retbleed:             Not affected
Vulnerability Spec rstack overflow: Not affected
Vulnerability Spec store bypass:    Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:           Mitigation; usercopy/swapgs barriers and __user pointer sanitization
Vulnerability Spectre v2:           Mitigation; Enhanced / Automatic IBRS; IBPB conditional; RSB filling; PBRSB-eIBRS SW sequence; BHI BHI_DIS_S
Vulnerability Srbds:                Not affected
Vulnerability Tsx async abort:      Not affected

Versions of relevant libraries:
[pip3] No relevant packages
[conda] Could not collect
ROCM Version: Could not collect
Neuron SDK Version: N/A
vLLM Version: N/A
vLLM Build Flags:
CUDA Archs: Not Set; ROCm: Disabled; Neuron: Disabled
GPU Topology:
GPU0    CPU Affinity    NUMA Affinity   GPU NUMA ID
GPU0     X      0-31    0               N/A

Legend:

  X    = Self
  SYS  = Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI)
  NODE = Connection traversing PCIe as well as the interconnect between PCIe Host Bridges within a NUMA node
  PHB  = Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU)
  PXB  = Connection traversing multiple PCIe bridges (without traversing the PCIe Host Bridge)
  PIX  = Connection traversing at most a single PCIe bridge
  NV#  = Connection traversing a bonded set of # NVLinks

🐛 Describe the bug

I am getting the error and stack trace below when I turn on and adjust the cpu_offload_gb parameter for consumption by the vLLM AsyncLLMEngine. I have adjusted the parameter between several different offload numbers (2, 4, 8) but none have worked.

Attached further at the bottom is the custom backend wrapper code used to serve vLLM. The code works, and vLLM is able to inference, when turning off (removing) cpu_offload_gb.

I am using the following AWQ quantization of Phi-3-128k-instruct: bsmit1659/Phi-3-mini-128k-instruct-0.2-awq

Initializing Leapfrog
INFO 07-30 15:18:35 awq_marlin.py:77] The model is convertible to awq_marlin during runtime. Using awq_marlin kernel.
INFO 07-30 15:18:35 llm_engine.py:176] Initializing an LLM engine (v0.5.3) with config: model='.model/', speculative_config=None, tokenizer='.model/', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, rope_scaling=None, rope_theta=None, tokenizer_revision=None, trust_remote_code=True, dtype=torch.float16, max_seq_len=1024, download_dir=None, load_format=LoadFormat.AUTO, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=awq_marlin, enforce_eager=True, kv_cache_dtype=auto, quantization_param_path=None, device_config=cuda, decoding_config=DecodingConfig(guided_decoding_backend='outlines'), observability_config=ObservabilityConfig(otlp_traces_endpoint=None), seed=0, served_model_name=.model/, use_v2_block_manager=False, enable_prefix_caching=False)
INFO 07-30 15:18:35 selector.py:170] Cannot use FlashAttention-2 backend due to sliding window.
INFO 07-30 15:18:35 selector.py:54] Using XFormers backend.
INFO 07-30 15:18:37 model_runner.py:680] Starting to load model .model/...
INFO 07-30 15:18:37 selector.py:170] Cannot use FlashAttention-2 backend due to sliding window.
INFO 07-30 15:18:37 selector.py:54] Using XFormers backend.
Loading safetensors checkpoint shards:   0% Completed | 0/1 [00:00<?, ?it/s]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00,  5.91it/s]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:00<00:00,  5.91it/s]

[rank0]: Traceback (most recent call last):
[rank0]:   File "<frozen runpy>", line 198, in _run_module_as_main
[rank0]:   File "<frozen runpy>", line 88, in _run_code
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/leapfrogai_sdk/cli.py", line 41, in <module>
[rank0]:     cli()  # pragma: no cover
[rank0]:     ^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/click/core.py", line 1157, in __call__
[rank0]:     return self.main(*args, **kwargs)
[rank0]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/click/core.py", line 1078, in main
[rank0]:     rv = self.invoke(ctx)
[rank0]:          ^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/click/core.py", line 1434, in invoke
[rank0]:     return ctx.invoke(self.callback, **ctx.params)
[rank0]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/click/core.py", line 783, in invoke
[rank0]:     return __callback(*args, **kwargs)
[rank0]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/leapfrogai_sdk/cli.py", line 37, in cli
[rank0]:     asyncio.run(serve(app(), host, port))
[rank0]:                       ^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/leapfrogai_sdk/llm.py", line 106, in __init__
[rank0]:     super().__init__(*args, **kwargs)
[rank0]:   File "/home/leapfrogai/packages/vllm/src/main.py", line 166, in __init__
[rank0]:     self.engine = AsyncLLMEngine.from_engine_args(self.engine_args)
[rank0]:                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/engine/async_llm_engine.py", line 466, in from_engine_args
[rank0]:     engine = cls(
[rank0]:              ^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/engine/async_llm_engine.py", line 380, in __init__
[rank0]:     self.engine = self._init_engine(*args, **kwargs)
[rank0]:                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/engine/async_llm_engine.py", line 547, in _init_engine
[rank0]:     return engine_class(*args, **kwargs)
[rank0]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/engine/llm_engine.py", line 251, in __init__
[rank0]:     self.model_executor = executor_class(
[rank0]:                           ^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/executor/executor_base.py", line 47, in __init__
[rank0]:     self._init_executor()
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/executor/gpu_executor.py", line 36, in _init_executor
[rank0]:     self.driver_worker.load_model()
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/worker/worker.py", line 139, in load_model
[rank0]:     self.model_runner.load_model()
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/worker/model_runner.py", line 682, in load_model
[rank0]:     self.model = get_model(model_config=self.model_config,
[rank0]:                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/model_executor/model_loader/__init__.py", line 21, in get_model
[rank0]:     return loader.load_model(model_config=model_config,
[rank0]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/model_executor/model_loader/loader.py", line 294, in load_model
[rank0]:     quant_method.process_weights_after_loading(module)
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/model_executor/layers/quantization/awq_marlin.py", line 224, in process_weights_after_loading
[rank0]:     marlin_qweight = ops.awq_marlin_repack(
[rank0]:                      ^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/_custom_ops.py", line 34, in wrapper
[rank0]:     return fn(*args, **kwargs)
[rank0]:            ^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/vllm/_custom_ops.py", line 282, in awq_marlin_repack
[rank0]:     return torch.ops._C.awq_marlin_repack(b_q_weight, size_k, size_n, num_bits)
[rank0]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]:   File "/home/leapfrogai/.venv/lib/python3.11/site-packages/torch/_ops.py", line 854, in __call__
[rank0]:     return self_._op(*args, **(kwargs or {}))
[rank0]:            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]: RuntimeError: b_q_weight is not on GPU
import asyncio
import json
import logging
import os
import queue
import random
import sys
import threading
import time
from typing import Any, Dict, AsyncGenerator

from confz import EnvSource
from dotenv import load_dotenv
from vllm import SamplingParams
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.outputs import RequestOutput
from vllm.utils import random_uuid

from config import AppConfig
from leapfrogai_sdk import (
    BackendConfig,
    ChatCompletionRequest,
    CompletionRequest,
)
from leapfrogai_sdk.llm import (
    GenerationConfig,
    LLM,
)

load_dotenv()

def clamp(n: float | int, smallest: float | int, largest: float | int):
    return max(smallest, min(n, largest))

class RandomAsyncIterator:
    """Manages multiple async iterables and allows iterating over them in a random order."""

    def __init__(self, async_iterables):
        # Convert each iterable into an async iterator
        self.async_iterators = [ai.__aiter__() for ai in async_iterables]

    def __aiter__(self):
        return self

    async def __anext__(self):
        """Return the next item from a randomly chosen iterator. If all iterators are exhausted, stop iteration."""
        if not self.async_iterators:  # Check if there are no iterators left
            raise StopAsyncIteration

        # Select a random iterator from the list
        random_index = random.randint(0, len(self.async_iterators) - 1)
        try:
            # Attempt to get the next item from the randomly selected iterator
            return await self.async_iterators[random_index].__anext__()
        except StopAsyncIteration:
            # If the selected iterator is exhausted, remove it from the list
            del self.async_iterators[random_index]

        # If all iterators are exhausted, raise StopAsyncIteration
        raise StopAsyncIteration

    def is_empty(self):
        """Check if there are any iterators left."""
        return len(self.async_iterators) <= 0

    def add_iterator(self, async_iterable):
        """Add a new async iterable to the pool of iterators."""
        self.async_iterators.append(async_iterable.__aiter__())

    def remove_iterator(self, async_iterable):
        """Attempt to remove an async iterable from the pool if it exists."""
        try:
            self.async_iterators.remove(async_iterable.__aiter__())
        except ValueError:
            pass  # If the iterable is not found, ignore the error

def get_backend_configs():
    # Manually load env var as ConfZ does not handle complex types (list)
    stop_tokens: str | None = os.getenv("LAI_STOP_TOKENS")
    if stop_tokens:
        processed_stop_tokens = json.loads(stop_tokens)
    else:
        processed_stop_tokens = []
    del os.environ["LAI_STOP_TOKENS"]

    env_source = EnvSource(
        allow_all=True,
        prefix="LAI_",
        remap={
            "model_source": "model.source",
            "max_context_length": "max_context_length",
            "stop_tokens": "stop_tokens",
            "prompt_format_chat_system": "prompt_format.chat.system",
            "prompt_format_chat_assistant": "prompt_format.chat.assistant",
            "prompt_format_chat_user": "prompt_format.chat.user",
            "prompt_format_defaults_top_p": "prompt_format.defaults.top_p",
            "prompt_format_defaults_top_k": "prompt_format.defaults.top_k",
        },
    )

    BackendConfig.CONFIG_SOURCES = env_source
    # Initialize an immutable config from env variables without stop_tokens list
    backend_configs: BackendConfig = BackendConfig()
    # Updates "processed_stop_tokens" without triggering Pydantic validation errors
    backend_configs.model_copy(update={"stop_tokens": processed_stop_tokens})

    return backend_configs

def get_config_from_request(request: ChatCompletionRequest | CompletionRequest):
    return GenerationConfig(
        max_new_tokens=request.max_new_tokens,
        temperature=request.temperature,
        top_k=request.top_k,
        top_p=request.top_p,
        do_sample=request.do_sample,
        n=request.n,
        stop=list(request.stop),
        repetition_penalty=request.repetition_penalty,
        presence_penalty=request.presence_penalty,
        best_of=str(request.best_of),
        logit_bias=request.logit_bias,
        return_full_text=request.return_full_text,
        truncate=request.truncate,
        typical_p=request.typical_p,
        watermark=request.watermark,
        seed=request.seed,
    )

@LLM
class Model:
    """Implements an LLM model with concurrent output generation and management."""

    done_by_id: Dict[str, bool] = {}
    delta_queue_by_id: Dict[str, queue.Queue] = {}
    result_by_id: Dict[str, RequestOutput] = {}
    random_iterator: RandomAsyncIterator = RandomAsyncIterator([])

    def __init__(self):
        logging.getLogger().setLevel(logging.DEBUG)

        # Background thread for managing output iteration
        _thread = threading.Thread(target=asyncio.run, args=(self.iterate_outputs(),))
        _thread.start()

        self.backend_config = get_backend_configs()
        self.model = self.backend_config.model.source
        self.engine_args = AsyncEngineArgs(
            engine_use_ray=False,
            model=self.model,
            trust_remote_code=AppConfig().backend_options.trust_remote_code,
            max_seq_len_to_capture=self.backend_config.max_context_length,
            max_model_len=self.backend_config.max_context_length,
            dtype="auto",
            worker_use_ray=False,
            gpu_memory_utilization=0.99,
            cpu_offload_gb=4,
            tensor_parallel_size=AppConfig().backend_options.tensor_parallel_size,
            enforce_eager=True
        )
        self.engine = AsyncLLMEngine.from_engine_args(self.engine_args)
        print(self.engine_args)

    async def iterate_outputs(self):
        """Continuously processes outputs from the random iterator and manages state by request IDs."""

        t0_by_id: dict[str, float] = {}
        index_by_id: dict[str, int] = {}
        num_tokens_by_id: dict[str, int] = {}

        while True:
            if not self.random_iterator.is_empty():
                request_output: RequestOutput
                async for request_output in self.random_iterator:
                    request_id = request_output.request_id

                    if request_output.finished:
                        # Signal that the "generate" function can stop waiting for additional inputs
                        logging.info(
                            f"Generated {num_tokens_by_id[request_id]} tokens in {time.time() - t0_by_id[request_id]:.2f}s"
                        )
                        self.done_by_id[request_id] = True
                    else:
                        # Initialize dictionary entries
                        if t0_by_id.get(request_id) is None:
                            t0_by_id[request_id] = time.time()

                        if index_by_id.get(request_id) is None:
                            index_by_id[request_id] = 0

                        if num_tokens_by_id.get(request_id) is None:
                            num_tokens_by_id[request_id] = 0

                    if (
                        request_output.outputs[0].text
                        and "\ufffd" == request_output.outputs[0].text[-1]
                    ):
                        continue

                    # Update tracking information
                    text_delta = request_output.outputs[0].text[
                        index_by_id[request_id] :
                    ]
                    index_by_id[request_id] = len(request_output.outputs[0].text)
                    num_tokens_by_id[request_id] = len(
                        request_output.outputs[0].token_ids
                    )

                    # Add the result to the queue for this request
                    self.delta_queue_by_id[request_id].put(text_delta)
            time.sleep(0)

    async def create_response(
        self, request_id: str, prompt: str, config: GenerationConfig
    ):
        """Initiate a response generation for the given prompt and configuration, adding the result to the iterator
        pool."""

        sampling_params = SamplingParams(
            temperature=config.temperature,
            # Clamp top_p value to prevent float errors
            top_p=clamp(config.top_p, 0.0 + sys.float_info.epsilon, 1.0),
            # Restrict top_k to valid values, -1 disables top_k
            top_k=config.top_k if config.top_k >= 1 else -1,
            stop=self.backend_config.stop_tokens,
            max_tokens=config.max_new_tokens,
            skip_special_tokens=False,
        )
        logging.debug(sampling_params)
        logging.info(f"Begin generation for request {request_id}")
        # Generate texts from the prompts. The output is a list of RequestOutput objects
        # that contain the prompt, generated text, and other information.
        gen_iter = self.engine.generate(prompt, sampling_params, request_id)
        logging.info(f"Begin iteration for request {request_id}")
        self.random_iterator.add_iterator(gen_iter)

    async def generate_session(
        self, session: str, prompt: str, config: GenerationConfig
    ):
        """Manage a session's lifecycle for generating output, including setting up necessary state and iterators."""

        if self.delta_queue_by_id.get(session) is None:
            self.delta_queue_by_id[session] = queue.Queue()

        await self.create_response(session, prompt, config)

    def is_queue_empty(self, request_id) -> bool:
        """Check if the queue for a given request ID is empty or non-existent."""

        cur_request_queue = self.delta_queue_by_id.get(request_id)
        return cur_request_queue is None or cur_request_queue.empty()

    async def generate(
        self, prompt: str, config: GenerationConfig
    ) -> AsyncGenerator[str, Any]:
        """Initiate and manage the generation process for a given prompt, yielding generated text segments."""

        request_id = random_uuid()
        self.done_by_id[request_id] = False

        # Spawns a thread to request a response for the prompt
        _thread = threading.Thread(
            target=asyncio.run,
            args=(self.generate_session(request_id, prompt, config),),
        )
        _thread.start()

        logging.info(f"Begin reading the output for request {request_id}")

        while not self.done_by_id.get(request_id) or not self.is_queue_empty(
            request_id
        ):
            result = ""
            if not self.is_queue_empty(request_id):
                result = self.delta_queue_by_id.get(request_id).get()
            yield result

        logging.info(f"Finished request {request_id}")

    async def count_tokens(self, raw_text: str) -> int:
        tokens: list[int] | list[str] = (await self.engine.get_tokenizer()).tokenize(
            raw_text
        )
        return len(tokens)
youkaichao commented 1 month ago

might be related with https://github.com/vllm-project/vllm/issues/6765