vllm-project / vllm

A high-throughput and memory-efficient inference and serving engine for LLMs
https://docs.vllm.ai
Apache License 2.0
26.78k stars 3.92k forks source link

When I set tensor_parallel_size=2, a time error occurred #3253

Open xjDUAN184 opened 6 months ago

xjDUAN184 commented 6 months ago

My device is A800. When I do not set the tensor_parallel_size parameter, the vllm+qwen14b api can start normally and automatically use card 0. image

When I set tensor_parallel_size=2, a RunTimeError occurs. image

The main contents are: RuntimeError: ProcessGroupNCCL is ony supported with GPUs, no GPUs found! Warning: CUDA initialization Unexpected error from cudaGetDeviceCount(). Did you run some cuda functions before calling NumCudaDevices() that might have already set an error? Error 103: intergrity checks failed (function operator())

I verified the following options: image

My envs: cuda 11.7 torch 2.0.1 python 3.8.16 vllm 0.2.1

How should this problem be solved? My api code:

import argparse import asyncio import json import time from http import HTTPStatus from typing import AsyncGenerator, Dict, List, Optional, Tuple, Union import torch import re import fastapi import uvicorn from fastapi import Request from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse,Response from packaging import version import time from typing import Any # 新加代码 import logging from datetime import datetime, timedelta, timezone # 新加代码 from vllm.engine.arg_utils import AsyncEngineArgs from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.entrypoints.openai.protocol import ( CompletionRequest, CompletionResponse, CompletionResponseChoice, CompletionResponseStreamChoice, CompletionStreamResponse, ChatCompletionRequest, ChatCompletionResponse, ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice, ChatCompletionStreamResponse, ChatMessage, DeltaMessage, ErrorResponse, LogProbs, ModelCard, ModelList, ModelPermission, UsageInfo) from vllm.logger import init_logger, _setup_logger from vllm.outputs import RequestOutput from vllm.sampling_params import SamplingParams from vllm.transformers_utils.tokenizer import get_tokenizer from vllm.utils import random_uuid from vllm.outputs import CompletionOutput from collections import defaultdict from difflib import SequenceMatcher try: import fastchat from fastchat.conversation import Conversation, SeparatorStyle from fastchat.model.model_adapter import get_conversation_template _fastchat_available = True except ImportError: _fastchat_available = False

TIMEOUT_KEEP_ALIVE = 5 # seconds

logger = init_logger(name) served_model = None app = fastapi.FastAPI() engine = None

@app.get("/health") async def health() -> Response: """Health check.""" return Response(status_code=200)

def manual_tokenize(text): tokens = re.split(r'([0-9]+|[,。、?!“”‘’,.-!?:";()]|\s|[\u4e00-\u9fff])', text)
tokens = [x for x in tokens if x != ''] return tokens

def remove_repetition(text, n=2, rep=5):

words = [*text]

words = manual_tokenize(text)

# print(words)
ngram_counts = defaultdict(int)
outputs = []
ngrams = [words[i:i+n] for i in range(len(words)-n+1)]
ngrams_len = len(ngrams)
for i in range(ngrams_len):
    ngram = ngrams[i]
    ngram_str = ' '.join(ngram)
    ngram_counts[ngram_str] += 1
    if ngram_counts[ngram_str] <= rep:
        if i < ngrams_len-1:
            outputs.extend(ngram[0])
        else:
            outputs.extend(ngram)

return ''.join(outputs)

def remove_repetition_line(text, threshold=0.99): seen = set() output = []

for line in text.split('\n'):
    # print(line)
    if (output==[]) or (line==''):
        seen.add(line)
        output.append(line)
    else:
        for seen_line in seen:
            similarity = SequenceMatcher(None, line, seen_line).ratio() 
            # print((similarity, line))
            if similarity > threshold:
                break
        else:
            seen.add(line)
            output.append(line)

return '\n'.join(output)

def content_chuli(content): if "\n\n" in content: content=content.replace("\n\n","\n") if "assistant" in content: content=content.replace("assistant","")
content=remove_repetition_line(content, 0.95) content = remove_repetition(content, 4, 8) return content.encode("UTF-8").decode("UTF-8")

def create_error_response(status_code: HTTPStatus, message: str) -> JSONResponse: return JSONResponse(ErrorResponse(message=message, type="invalid_request_error").dict(), status_code=status_code.value)

@app.exception_handler(RequestValidationError) async def validation_exception_handler(request, exc): # pylint: disable=unused-argument return create_error_response(HTTPStatus.BAD_REQUEST, str(exc))

async def check_model(request) -> Optional[JSONResponse]: global MODEL_PATH

if request.model == served_model:

if MODEL_PATH == served_model:
    return

ret = create_error_response(
    HTTPStatus.NOT_FOUND,
    # f"The model `{request.model}` does not exist.",
    f"The model `{MODEL_PATH}` does not exist.",
)
return ret

async def get_gen_prompt(request) -> str: system_prompt=request.system_prompt query=request.prompt if "在生成的回答中你需要覆盖以下8个问题:\n1. 研究的是什么" in query: cue1='请根据提供的论文,完成论文的摘要,摘要字数需要控制在500字。在生成的回答中你需要覆盖以下8个问题:\n1. 研究的是什么\n2. 为什么会发起这项研究\n3. 使用了什么研究方法\n4. 研究的实验结果是什么\n5. 研究的结论是什么\n6. 研究的意义是什么\n7. 研究的关键步骤是什么\n8. 获取文章标题,获取失败则生成标题;\n这段是需要生成技术报告的文本\n' cue2='\n输出格式要求如下,从8个方面给出回答:\n1. 研究问题:\n2. 研究原因:\n3. 研究方法:\n4. 研究结果:\n5. 研究结论:\n6. 研究意义:\n7. 研究脉络:\n8. 文章标题:' query = query.replace(cue1, '') query = query.replace(cue2, '') query=' '.join(query.split()) query=cue1+query+cue2 else: query=' '.join(request.prompt.split())

history=request.history
max_window_size = request.max_windows_size

if history is None:
    history = []

im_start, im_end = "<|im_start|>", "<|im_end|>"
im_start_tokens = [tokenizer.im_start_id] # <|im_start|>
im_end_tokens = [tokenizer.im_end_id] # <|im_end|>
nl_tokens = tokenizer.encode("\n") # \n

def _tokenize_str(role, content):
    return f"{role} \n{content}", tokenizer.encode(
        role, allowed_special=set()
    ) + nl_tokens + tokenizer.encode(content, allowed_special=set())

system_text, system_tokens_part = _tokenize_str("system", system_prompt)
system_tokens = im_start_tokens + system_tokens_part + im_end_tokens

raw_text = ""
context_tokens = []

for turn_query, turn_response in reversed(history):
    query_text, query_tokens_part = _tokenize_str("user", turn_query)
    query_tokens = im_start_tokens + query_tokens_part + im_end_tokens
    response_text, response_tokens_part = _tokenize_str(
        "assistant", turn_response
    )
    response_tokens = im_start_tokens + response_tokens_part + im_end_tokens

    next_context_tokens = nl_tokens + query_tokens + nl_tokens + response_tokens
    prev_chat = (
        f"\n{im_start}{query_text}{im_end}\n{im_start}{response_text}{im_end}"
    )

    current_context_size = (
            len(system_tokens) + len(next_context_tokens) + len(context_tokens)
    )
    if current_context_size < max_window_size:
        context_tokens = next_context_tokens + context_tokens
        raw_text = prev_chat + raw_text
    else:
        break

context_tokens = system_tokens + context_tokens
raw_text = f"{im_start}{system_text}{im_end}" + raw_text
context_tokens += (
        nl_tokens
        + im_start_tokens
        + _tokenize_str("user", query)[1]
        + im_end_tokens
        + nl_tokens
        + im_start_tokens
        + tokenizer.encode("assistant")
        + nl_tokens
)
raw_text += f" \n{im_start}user\n{query}{im_end} \n{im_start}assistant\n\n"
print(f'raw_text:{raw_text}')
logger.info(f'raw_text:{raw_text}')
return raw_text

async def check_length( request: Union[ChatCompletionRequest, CompletionRequest], prompt: Optional[str] = None, prompt_ids: Optional[List[int]] = None ) -> Tuple[List[int], Optional[JSONResponse]]: assert (not (prompt is None and prompt_ids is None) and not (prompt is not None and prompt_ids is not None) ), "Either prompt or prompt_ids should be provided." if prompt_ids is not None: input_ids = prompt_ids else: input_ids = tokenizer(prompt).input_ids token_num = len(input_ids)

if request.max_new_tokens is None:
    request.max_new_tokens = max_model_len - token_num
if token_num + request.max_new_tokens > max_model_len:

    return input_ids, create_error_response(
        HTTPStatus.BAD_REQUEST,
        f"This model's maximum context length is {max_model_len} tokens. "
        f"However, you requested {request.max_new_tokens + token_num} tokens "
        f"({token_num} in the messages, "
        f"{request.max_new_tokens} in the completion). "
        f"Please reduce the length of the messages or completion.",
    )
else:
    return input_ids, None

@app.get("/v1/models") async def show_available_models(): """Show available models. Right now we only have one model.""" model_cards = [ ModelCard(id=served_model, root=served_model, permission=[ModelPermission()]) ] return ModelList(data=model_cards)

def create_logprobs(token_ids: List[int], id_logprobs: List[Dict[int, float]], initial_text_offset: int = 0) -> LogProbs: """Create OpenAI-style logprobs.""" logprobs = LogProbs() last_token_len = 0 for token_id, id_logprob in zip(token_ids, id_logprobs): token = tokenizer.convert_ids_to_tokens(token_id) logprobs.tokens.append(token) logprobs.token_logprobs.append(id_logprob[token_id]) if len(logprobs.text_offset) == 0: logprobs.text_offset.append(initial_text_offset) else: logprobs.text_offset.append(logprobs.text_offset[-1] + last_token_len) last_token_len = len(token)

    logprobs.top_logprobs.append({
        tokenizer.convert_ids_to_tokens(i): p
        for i, p in id_logprob.items()
    })
return logprobs

@app.post("/v1/chat/completions") async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request):

global MODEL_type # 模型名
global MODEL_PATH # 模型路径
model_name = MODEL_PATH
model_Type = MODEL_TYPE # 记录模型名

logger.info(f"Received chat completion request: {request}")

error_check_ret = await check_model(request)
if error_check_ret is not None:
    return error_check_ret

if request.logit_bias is not None and len(request.logit_bias) > 0:
    # TODO: support logit_bias in vLLM engine.
    return create_error_response(HTTPStatus.BAD_REQUEST,
                                 "logit_bias is not currently supported")

prompt = await get_gen_prompt(request)

def create_stream_response_json(
        index: int,
        text: str,
        finish_reason: Optional[str] = None,
) -> str:
    choice_data = ChatCompletionResponseStreamChoice(
        index=index,
        delta=DeltaMessage(content=text),
        finish_reason=finish_reason,
    )
    response = ChatCompletionStreamResponse(
        # id=request_id,
        # created=created_time,
        # model=model_name,
        model = model_Type, # 返回模型名
        choices=[choice_data],
    )
    response_json = response.json(ensure_ascii=False)

    return response_json

token_ids, error_check_ret = await check_length(request, prompt=prompt)
if error_check_ret is not None:
    return error_check_ret

request_id = f"cmpl-{random_uuid()}"
created_time = int(time.monotonic())
try:
    sampling_params = SamplingParams(
        n=request.n,
        presence_penalty=request.presence_penalty,
        frequency_penalty=request.frequency_penalty,
        temperature=request.temperature,
        top_p=request.top_p,
        stop=request.stop,
        stop_token_ids=request.stop_token_ids,
        max_tokens=request.max_new_tokens,
        best_of=request.best_of,
        top_k=request.top_k,
        ignore_eos=request.ignore_eos,
        use_beam_search=request.use_beam_search,
        skip_special_tokens=request.skip_special_tokens,
    )
except ValueError as e:
    return create_error_response(HTTPStatus.BAD_REQUEST, str(e))
result_generator = engine.generate(prompt, sampling_params, request_id,
                                   token_ids)

async def completion_stream_generator() -> AsyncGenerator[str, None]:
    # First chunk with role
    for i in range(request.n):
        choice_data = ChatCompletionResponseStreamChoice(
            index=i,
            delta=DeltaMessage(role="assistant"),
            finish_reason=None,
        )
        chunk = ChatCompletionStreamResponse(object="chat.completion.chunk",
                                             id=request_id,
                                             choices=[choice_data],
                                             model=model_Type)
        data = chunk.json(exclude_unset=True, ensure_ascii=False)
        yield f"data: {data}\n\n"

    previous_texts = [""] * request.n
    previous_num_tokens = [0] * request.n
    async for res in result_generator:
        res: RequestOutput
        for output in res.outputs:
            i = output.index
            delta_text = output.text[len(previous_texts[i]):]
            previous_texts[i] = output.text
            previous_num_tokens[i] = len(output.token_ids)
            response_json = create_stream_response_json(
                index=i,
                text=delta_text.encode("UTF-8").decode("UTF-8"),
            )
            yield f"data: {response_json}\n\n"
            if output.finish_reason is not None:
                response_json = create_stream_response_json(
                    index=i,
                    text="",
                    finish_reason=output.finish_reason,
                )
                yield f"data: {response_json}\n\n"
    yield "data: [DONE]\n\n"

if request.stream or request.use_stream_chat:
    return StreamingResponse(completion_stream_generator(),
                             media_type="text/event-stream")

final_res: RequestOutput = None
# choices = [] # 新代码
async for res in result_generator:
    if await raw_request.is_disconnected():
        # Abort the request if the client disconnects.
        await engine.abort(request_id)
        return create_error_response(HTTPStatus.BAD_REQUEST,
                                     "Client disconnected")
    # 新代码
    # for output in res.outputs:
    #     choice_data = ChatCompletionResponseChoice(
    #     index=output.index,
    #     message=ChatMessage(role="assistant", content=output.text),
    #     finish_reason=output.finish_reason,
    # )
    # choices.append(choice_data)
    final_res = res

assert final_res is not None
choices = []
for output in final_res.outputs:
    choice_data = ChatCompletionResponseChoice(
        index=output.index,
        message=ChatMessage(role="assistant", content=output.text),
        finish_reason=output.finish_reason,
    )
    choices.append(choice_data)

# choice = choices[0]
contents = [choice.message.content for choice in choices]
content_list= [content_chuli(content) for content in contents]
content2=None
# return {"response": choices}

return_content = []
for content in content_list:
    count = 0 # 新加代码1                                                
    if (("抱歉" or "sorry") in " ".join(manual_tokenize(content)[:20])) or (len(manual_tokenize(content)) <= len(manual_tokenize(prompt))*0.005):  
        count += 1  # 新加代码2                 
        try:
            sampling_params = SamplingParams(
                n=1,
                presence_penalty=request.presence_penalty,
                frequency_penalty=request.frequency_penalty,
                temperature=1,
                top_p=request.top_p,
                stop=request.stop,
                stop_token_ids=request.stop_token_ids,
                max_tokens=request.max_new_tokens,
                best_of=request.best_of,
                top_k=50,
                ignore_eos=request.ignore_eos,
                use_beam_search=request.use_beam_search,
                skip_special_tokens=request.skip_special_tokens,
            )
        except ValueError as e:
            return create_error_response(HTTPStatus.BAD_REQUEST, str(e))

        pattern = r'(<\|im_start\|>system \n).*?(<\|im_end\|>)'  
        prompt = re.sub(pattern, r'\1You are a helpful assistant.\2', prompt)
        result_generator = engine.generate(prompt, sampling_params, request_id,
                                token_ids)
        final_res: RequestOutput = None
        async for res in result_generator:
            if await raw_request.is_disconnected():
                # Abort the request if the client disconnects.
                await engine.abort(request_id)
                return create_error_response(HTTPStatus.BAD_REQUEST,
                                            "Client disconnected")
            final_res = res
        assert final_res is not None
        choices = []
        for output in final_res.outputs:
            choice_data = ChatCompletionResponseChoice(
                index=output.index,
                message=ChatMessage(role="assistant", content=output.text),
                finish_reason=output.finish_reason,
            )
            choices.append(choice_data)
        choice = choices[0]
        content = choice.message.content
        content2= content_chuli(content) 
        content2 = content
    # 如果第二次调用结果没有第一次好,content使用第一次
    if content2:                                              
        if len(content2)<len(content):
            content=content
        else:
            content=content2
    else:
        content = content  # 新加代码
    if request.history != "":
        History = request.history
    else:
        History = []
    print(f"content: {content}")
    logger.info(f"content: {content}")
    return_content.append({"response": content, "history": History, "count": count, "length": len(content)}) 

return return_content

if name == "main": parser = argparse.ArgumentParser( description="vLLM OpenAI-Compatible RESTful API server.") parser.add_argument("--host", type=str, default=None, help="host name") parser.add_argument("--port", type=int, default=8098, help="port number") parser.add_argument("--allow-credentials", action="store_true", help="allow credentials") parser.add_argument("--allowed-origins", type=json.loads, default=[""], help="allowed origins") parser.add_argument("--allowed-methods", type=json.loads, default=[""], help="allowed methods") parser.add_argument("--allowed-headers", type=json.loads, default=["*"], help="allowed headers") parser.add_argument("--served-model-name", type=str, default=None, help="The model name used in the API. If not " "specified, the model name will be the same as " "the huggingface name.") parser.add_argument("--model_type", type=str, default=None, help="The model name in huggingface.") parser.add_argument("--tensor_parallel_size", type=int, default=1, help="number of gpus to use") parser.add_argument("--gpu_memory_utilization", type=float, default=0.90) parser = AsyncEngineArgs.add_cli_args(parser) args = parser.parse_args()

logger.info(f"args: {args}")

global MODEL_TYPE
global MODEL_PATH
MODEL_TYPE = args.model_type
MODEL_PATH = args.model

if args.served_model_name is not None:
    served_model = args.served_model_name
else:
    served_model = args.model

engine_args = AsyncEngineArgs.from_cli_args(args)
engine = AsyncLLMEngine.from_engine_args(engine_args)
engine_model_config = asyncio.run(engine.get_model_config())
max_model_len = engine_model_config.max_model_len

# A separate tokenizer to map token IDs to strings.
tokenizer = get_tokenizer(engine_args.tokenizer,
                          tokenizer_mode=engine_args.tokenizer_mode,
                          trust_remote_code=engine_args.trust_remote_code)

"""uvicorn日志打印中加上当前时间"""
def converter(*args):
    cn_tz = timezone(timedelta(hours=8), name='Asia/Shanghai')
    dt = datetime.now(cn_tz) 
    return dt.timetuple() # 返回一个tuple
logging.Formatter.converter = converter
log_config = uvicorn.config.LOGGING_CONFIG
log_config["formatters"]["access"]["fmt"] = ' %(levelprefix)s %(asctime)s - %(client_addr)s - "%(request_line)s" - %(status_code)s '
log_config["formatters"]["default"]["fmt"] = "  %(levelprefix)s %(asctime)s %(message)s "
uvicorn.run(app,
            host=args.host,
            port=args.port,
            log_level="info",
            timeout_keep_alive=TIMEOUT_KEEP_ALIVE)
xjDUAN184 commented 6 months ago

This is when tensor_parallel_size=2 is used:

55e3c29fa1a7a726d89e5d0ead872834
r000bin commented 6 months ago

I once had this problem on GCP with an older version of vLLM. It turned out there was an error in ray to detect GPUs on GCP. That problem got fixed some weeks ago. Can you try with the newest version of vLLM? Because with vLLM 0.2.1 you're using an old one.