Closed savannahfung closed 1 month ago
There could be an issue with your processing script, I've managed to have hundreds of concurrent requests in flight without seeing issues like this. Would you be able to share your processing script?
@savannahfung @WoosukKwon @hmellor I'm experiencing a similar issue as described above, including with "vllm.entrypoints.api_server". Initially, it handles concurrent requests effectively, but eventually, it starts to hang. My assumption is that this might be related to the GPU KV cache steadily increasing to 99.4%, leading to crashes and hang-ups, thereby leaving other requests in a "pending" state. It's highly probable that this issue is related to #2731.
@hmellor I am testing the performance of different llms and the script works for smaller number of concurrent requests. For openchat/openchat_3.5, it hangs at around 10 concurrent requests, but if I use larger models (e.g. Nexusflow/NexusRaven-13B, mistralai/Mixtral-8x7B-Instruct-v0.1), it hangs earlier at around 2-3 concurrent requests.
def create(self, messages):
res = self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True
)
return res
def generate(self, messages):
start_time = time.time()
stream = self.create(messages)
res_content = ""
for chunk in stream:
res = chunk
if res.choices[0].finish_reason is not None:
end_time = time.time()
break
if res.choices[0].delta.role is not None:
created_time = time.time()
res_role = res.choices[0].delta.role
if res.choices[0].delta.content is not None:
res_content += res.choices[0].delta.content
if res.choices[0].finish_reason == "stop":
res_status = "success"
elif res.choices[0].finish_reason == "length":
print("Warning: response reached max tokens.")
res_status = "max_tokens"
else:
print("Error: response finished unexpectedly.")
res_status = "error"
response = {
"status": res_status,
"object": res
}
total_duration = end_time - start_time
ttft = created_time - start_time
tpot = total_duration / res.usage["completion_tokens"] if res.usage["completion_tokens"] > 0 else 0
throughput = 1 / (ttft + tpot)
res_message = {
"role": res_role,
"content": res_content
}
return {
"ttft": ttft,
"tpot": tpot,
"throughput": throughput,
"latency": total_duration,
"message": res_message,
"response": response
}
def evaluate(self, input_file, output_file, reference_file, user_id = None, avg_res_time = None, results = None):
chat_history = []
with open(input_file, "r") as inf, open(output_file, "w") as outf, open(reference_file, "r") as ref:
if user_id is not None:
outf.write("User ID: " + str(user_id) + "\n")
outf.write("Model: " + self.llm.model + "\n")
outf.write("Input file: " + input_file + "\n\n")
metrics = {
"bleu": 0,
"rouge1": 0,
"rougeL": 0,
"ttft": 0,
"tpot": 0,
"throughput": 0,
"latency": 0,
}
total_lines = sum(1 for line in inf)
inf.seek(0)
desc = f"{'User ' + str(user_id) + ': ' if user_id is not None else ''}Processing {input_file.split('/')[-1]}"
for line_num, (input_line, ref_line) in enumerate(zip(tqdm(inf, total=total_lines, desc=desc, unit="lines"), ref)):
message = json.loads(input_line)
chat_history.append(message)
if message["role"] == "system":
outf.write("> " + message["content"] + "\n\n")
continue
if avg_res_time is not None:
res_time = np.random.weibull(1) / avg_res_time
time.sleep(res_time)
res = self.llm.generate(chat_history)
chat_history.append(res["message"])
smoothing = SmoothingFunction().method1
bleu = sentence_bleu([ref_line.split()], res["message"]["content"].split(), smoothing_function=smoothing)
metrics["bleu"] += bleu
scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
rouge = scorer.score(ref_line, res["message"]["content"])
metrics["rouge1"] += rouge["rouge1"].fmeasure
metrics["rougeL"] += rouge["rougeL"].fmeasure
metrics["ttft"] += res["ttft"]
metrics["tpot"] += res["tpot"]
metrics["throughput"] += res["throughput"]
metrics["latency"] += res["latency"]
outf.write(">>> " + message["content"] + "\n\n")
outf.write(res["message"]["content"] + "\n\n")
outf.write("BLEU: " + str(bleu) + "\n")
outf.write("Rouge-1: " + str(rouge["rouge1"].fmeasure) + "\n")
outf.write("Rouge-L: " + str(rouge["rougeL"].fmeasure) + "\n")
outf.write("TTFT: " + str(res["ttft"]) + "\n")
outf.write("TPOT: " + str(res["tpot"]) + "\n")
outf.write("Throughput: " + str(res["throughput"]) + "\n")
outf.write("Latency: " + str(res["latency"]) + "\n\n")
if res["response"]["status"] != "success":
outf.write("Stop request.\n")
outf.write("Status: " + res["response"]["status"] + "\n")
outf.write("Object: " + str(res["response"]["object"]) + "\n")
break
metrics["bleu"] /= (total_lines - 1)
metrics["rouge1"] /= (total_lines - 1)
metrics["rougeL"] /= (total_lines - 1)
metrics["ttft"] /= (total_lines - 1)
metrics["tpot"] /= (total_lines - 1)
metrics["throughput"] /= (total_lines - 1)
metrics["latency"] /= (total_lines - 1)
outf.write("Average BLEU: " + str(metrics["bleu"]) + "\n")
outf.write("Average Rouge-1: " + str(metrics["rouge1"]) + "\n")
outf.write("Average Rouge-L: " + str(metrics["rougeL"]) + "\n")
outf.write("Average TTFT: " + str(metrics["ttft"]) + "\n")
outf.write("Average TPOT: " + str(metrics["tpot"]) + "\n")
outf.write("Average Throughput: " + str(metrics["throughput"]) + "\n")
outf.write("Average Latency: " + str(metrics["latency"]) + "\n")
if results is not None:
results.append(metrics)
return metrics
def simulate_user(self, num_users, avg_res_time):
metrics = {
"bleu": 0,
"rouge1": 0,
"rougeL": 0,
"ttft": 0,
"tpot": 0,
"throughput": 0,
"latency": 0,
}
total_files = len(self.files)
metrics_file = self.simulation_metrics_base + self.model_name + ".txt"
with open(metrics_file, "w") as mf:
mf.write("Model: " + self.llm.model + "\n\n")
mf.write("Number of users: " + str(num_users) + "\n")
mf.write("Average response time: " + str(avg_res_time) + "\n\n")
for file_num, file in tqdm(enumerate(self.files, start=1), total=total_files, desc="Processing files",
unit="files"):
input_file = self.input_base + file + ".txt"
reference_file = self.reference_base + file + ".txt"
output_file = self.simulation_output_base + file + "/"
os.makedirs(output_file, exist_ok=True)
fmetrics = {
"bleu": 0,
"rouge1": 0,
"rougeL": 0,
"ttft": 0,
"tpot": 0,
"throughput": 0,
"latency": 0,
}
threads = []
results = []
for user_id in range(num_users):
usr_output_file = output_file + str(user_id) + ".txt"
thread = threading.Thread(target=self.evaluate,
args=(input_file, usr_output_file, reference_file, user_id, avg_res_time, results))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
for res in results:
fmetrics["bleu"] += res["bleu"]
fmetrics["rouge1"] += res["rouge1"]
fmetrics["rougeL"] += res["rougeL"]
fmetrics["ttft"] += res["ttft"]
fmetrics["tpot"] += res["tpot"]
fmetrics["throughput"] += res["throughput"]
fmetrics["latency"] += res["latency"]
fmetrics["bleu"] /= num_users
fmetrics["rouge1"] /= num_users
fmetrics["rougeL"] /= num_users
fmetrics["ttft"] /= num_users
fmetrics["tpot"] /= num_users
fmetrics["throughput"] /= num_users
fmetrics["latency"] /= num_users
metrics["bleu"] += fmetrics["bleu"]
metrics["rouge1"] += fmetrics["rouge1"]
metrics["rougeL"] += fmetrics["rougeL"]
metrics["ttft"] += fmetrics["ttft"]
metrics["tpot"] += fmetrics["tpot"]
metrics["throughput"] += fmetrics["throughput"]
metrics["latency"] += fmetrics["latency"]
mf.write("filee: " + file + "\n")
mf.write("BLEU: " + str(fmetrics["bleu"]) + "\n")
mf.write("Rouge-1: " + str(fmetrics["rouge1"]) + "\n")
mf.write("Rouge-L: " + str(fmetrics["rougeL"]) + "\n")
mf.write("TTFT: " + str(fmetrics["ttft"]) + "\n")
mf.write("TPOT: " + str(fmetrics["tpot"]) + "\n")
mf.write("Throughput: " + str(fmetrics["throughput"]) + "\n")
mf.write("Latency: " + str(fmetrics["latency"]) + "\n\n")
metrics["bleu"] /= total_files
metrics["rouge1"] /= total_files
metrics["rougeL"] /= total_files
metrics["ttft"] /= total_files
metrics["tpot"] /= total_files
metrics["throughput"] /= total_files
metrics["latency"] /= total_files
mf.write("Average BLEU: " + str(metrics["bleu"]) + "\n")
mf.write("Average Rouge-1: " + str(metrics["rouge1"]) + "\n")
mf.write("Average Rouge-L: " + str(metrics["rougeL"]) + "\n")
mf.write("Average TTFT: " + str(metrics["ttft"]) + "\n")
mf.write("Average TPOT: " + str(metrics["tpot"]) + "\n")
mf.write("Average Throughput: " + str(metrics["throughput"]) + "\n")
mf.write("Average Latency: " + str(metrics["latency"]) + "\n")
return metrics
@nehalvaghasiya That's weird, because just before it hangs the GPU KV cache usage was just 4.2% and the CPU KV cache usage was 0.0%. Unless it just suddenly spiked to 99.9%.
INFO 02-05 04:53:21 llm_engine.py:706] Avg prompt throughput: 1137.2 tokens/s, Avg generation throughput: 193.7 tokens/s, Running: 5 reqs, Swapped: 0 reqs, Pending: 0 reqs, GPU KV cache usage: 4.2%, CPU KV cache usage: 0.0%
Hey @savannahfung, just wanted to let you know that I've been running 100 concurrent requests smoothly. However, when I try to send the 101st request, I've noticed that the GPU KV Cache usage spikes up to 99.4%. Consequently, all subsequent requests end up in a 'pending' state.
@savannahfung I meant the part of your script that makes the requests, you've provided too much extra code to find where the problem is. Can you make a minimal reproducer?
I'm following up because I am also now seeing this issue where 100 concurrent requests will indefinitely be swapped in and out of the pending queue as described by @nehalvaghasiya.
Hi all, I am experiencing the same issue. Python==3.11.5 vllm==0.4.0.post1 openai==1.23.1
This is how I start the openai server:
CUDA_VISIBLE_DEVICES=0 python -m vllm.entrypoints.openai.api_server --model mistralai/Mistral-7B-Instruct-v0.2 --uvicorn-log-level debug --port 8001 > vllm_server_log.txt 2>&1 &
This is my Python code to produce the error:
import asyncio
from openai import AsyncOpenAI
model_name='mistralai/Mistral-7B-Instruct-v0.2'
client=AsyncOpenAI(api_key="EMPTY",base_url=f"http://localhost:8001/v1/")
async def _send_chat_completion(messages):
completion = await client.chat.completions.create(model=model_name, messages=messages, temperature=0.0)
return completion.choices[0].message.content.strip()
async def _send_async_requests(prompts_messages):
tasks = [_send_chat_completion(msgs) for msgs in prompts_messages]
responses = await asyncio.gather(*tasks)
return responses
prompts_msgs = [{'role': 'user', 'content': 'suggest a dinner meal'}]
print('Starting first run..')
responses = asyncio.run(_send_async_requests([prompts_msgs] * 5))
print('Starting second run..')
responses = asyncio.run(_send_async_requests([prompts_msgs] * 5))
The second run never finishes and the server logs don't even mention it as incoming requests.
I want to point other users facing similar issue to the issue on the corresponding openai github page where they reported that they are actively working on the fix but it seems to be a more serious issue related to other modules used by openai (see https://github.com/openai/openai-python/issues/769).
My workaround was to use raw requests where I did not see this error happening (albeit openai reports in the above linked issue that you might encounter the same issue there, too). Adjusting above code looks like this:
import asyncio
import aiohttp
async def _send_chat_completion(messages):
print('starting openai request')
async with aiohttp.ClientSession() as session:
response = await session.post(url="http://localhost:8001/v1/chat/completions",
json={"messages": messages, "model": "mistralai/Mistral-7B-Instruct-v0.2"},
headers={"Content-Type": "application/json"})
return await response.json()
async def _send_async_requests(prompts_messages):
tasks = [_send_chat_completion(msgs) for msgs in prompts_messages]
responses = await asyncio.gather(*tasks)
responses = [resp['choices'][0]['message']['content'].strip() for resp in responses]
return responses
prompts_msgs = [{'role': 'user', 'content': 'suggest a dinner meal'}]
print('Starting first run..')
responses = asyncio.run(_send_async_requests([prompts_msgs] * 5))
print('Starting second run..')
responses = asyncio.run(_send_async_requests([prompts_msgs] * 5))
Let's hope it gets fixed quickly.
Stale
I am using a runpod container to run vLLM. Template: runpod/pytorch:2.1.1-py3.10-cuda12.1.1-devel-ubuntu22.04 GPU Cloud: 1 x RTX 3090 | 12 vCPU 31 GB RAM
It works perfectly fine when I send 9 concurrent requests but it starts to hang when I increase it to 10.
python -m vllm.entrypoints.openai.api_server --model openchat/openchat_3.5 --tensor-parallel-size 1
It just stop processing the last input and hangs there.
I tried to inlcude
--swap-space 0
but the error still exists, nothing changes.