Open wjj19950828 opened 10 months ago
@byshiue Does the exec function need to be executed async?
@juney-nvidia Hello, do you have any solution ideas at present? I can try to solve it myself~
@byshiue @juney-nvidia I tried to use asynchronous and coroutine to solve this problem, but I encountered the error as shown below, saying that the decouple mode does not support asynchronous, but I see that the official website seems to have some demos of decouple async. How should I solve this problem? Thank you ~
@wjj19950828
Hi, can you follow this template to provide the concrete steps to reproduce your issue? Then our engineers can help with the investigation.
June
@juney-nvidia I hope this problem can be solved as soon as possible, thank you~
use tensorrt_llm_bls
1、config.txt
name: "tensorrt_llm_bls"
backend: "python"
max_batch_size: 128
model_transaction_policy {
decoupled: True
}
input [
{
name: "text_input"
data_type: TYPE_STRING
dims: [ -1 ]
},
{
name: "max_tokens"
data_type: TYPE_INT32
dims: [ -1 ]
},
{
name: "bad_words"
data_type: TYPE_STRING
dims: [ -1 ]
optional: true
},
{
name: "stop_words"
data_type: TYPE_STRING
dims: [ -1 ]
optional: true
},
{
name: "end_id"
data_type: TYPE_INT32
dims: [ 1 ]
optional: true
},
{
name: "pad_id"
data_type: TYPE_INT32
dims: [ 1 ]
optional: true
},
{
name: "top_k"
data_type: TYPE_INT32
dims: [ 1 ]
optional: true
},
{
name: "top_p"
data_type: TYPE_FP32
dims: [ 1 ]
optional: true
},
{
name: "temperature"
data_type: TYPE_FP32
dims: [ 1 ]
optional: true
},
{
name: "length_penalty"
data_type: TYPE_FP32
dims: [ 1 ]
optional: true
},
{
name: "repetition_penalty"
data_type: TYPE_FP32
dims: [ 1 ]
optional: true
},
{
name: "min_length"
data_type: TYPE_INT32
dims: [ 1 ]
optional: true
},
{
name: "presence_penalty"
data_type: TYPE_FP32
dims: [ 1 ]
optional: true
},
{
name: "random_seed"
data_type: TYPE_UINT64
dims: [ 1 ]
optional: true
},
{
name: "return_log_probs"
data_type: TYPE_BOOL
dims: [ 1 ]
optional: true
},
{
name: "beam_width"
data_type: TYPE_INT32
dims: [ 1 ]
optional: true
},
{
name: "stream"
data_type: TYPE_BOOL
dims: [ 1 ]
optional: true
},
{
name: "prompt_embedding_table"
data_type: TYPE_FP16
dims: [ -1, -1 ]
optional: true
},
{
name: "prompt_vocab_size"
data_type: TYPE_INT32
dims: [ 1 ]
optional: true
},
{
name: "embedding_bias_words"
data_type: TYPE_STRING
dims: [ -1 ]
optional: true
},
{
name: "embedding_bias_weights"
data_type: TYPE_FP32
dims: [ -1 ]
optional: true
}
]
output [
{
name: "text_output"
data_type: TYPE_STRING
dims: [ -1 ]
},
{
name: "cum_log_probs"
data_type: TYPE_FP32
dims: [ -1 ]
},
{
name: "output_log_probs"
data_type: TYPE_FP32
dims: [ -1, -1 ]
}
]
parameters: {
key: "accumulate_tokens"
value: {
string_value: "${accumulate_tokens}"
}
}
instance_group [
{
count: 1
kind : KIND_CPU
}
]
2、start a server 3、Send requests with 26 concurrent requests,the script is as follows
python3 benchmark_serving_grpc.py --port 8222 --dataset datasets/prompts.txt --concurrent 26
import os
import time
import threading
import uuid
import json
import argparse
import requests
import tritonclient.grpc as grpcclient
import tritonclient.http as httpclient
import queue
import sys
import numpy as np
from tritonclient.utils import *
from tritonclient.utils import np_to_triton_dtype
from functools import partial
class UserData:
def __init__(self):
self._completed_requests = queue.Queue()
def callback(user_data, result, error):
if error:
user_data._completed_requests.put(error)
else:
user_data._completed_requests.put(result)
output = result.as_numpy("text_output")
#print(output[0], flush=True)
def prepare_tensor(name, input, protocol):
client_util = httpclient if protocol == "http" else grpcclient
t = client_util.InferInput(name, input.shape, np_to_triton_dtype(input.dtype))
t.set_data_from_numpy(input)
return t
def load_dataset(path):
if path is None:
path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "datasets/prompts.txt"
)
print(f"Using dataset: {path}")
if path.endswith(".json"):
with open(path, "r") as f:
dataset = json.load(f)
elif path.endswith(".txt"):
dataset = list()
with open(path, "r") as f:
for line in f.readlines():
dataset.append(line.strip())
elif path.endswith(".jsonl"):
with open(path, "r") as f:
for line in f.readlines():
line = json.loads(line)
dataset.append(line["input"])
elif path.endswith(".xlsx"):
import pandas as pd
df = pd.read_excel(path)
dataset = df.iloc[:, 1].tolist()
print(f"Number of sample: {len(dataset)}")
return dataset
def get_prompt():
global curr_prompt_index
with get_prompt_lock:
if curr_prompt_index >= len(dataset):
return None
prompt = dataset[curr_prompt_index]
curr_prompt_index += 1
print(
"[Processing]["
+ ("*" * int(curr_prompt_index / len(dataset) * 19 + 1))
+ f"][{curr_prompt_index}/{len(dataset)}]"
)
return prompt
def main(args):
def thread_fn():
global num_err_requests
global num_timeout_requests
while True:
prompt = get_prompt()
if prompt is None:
break
start = time.time()
try:
request_id = f"test: {uuid.uuid4().hex}"
model_name = "tensorrt_llm_bls"
input0 = [[prompt]]
input0_data = np.array(input0).astype(object)
# max_tokens
output0_len = (
np.ones_like(input0).astype(np.int32) * args.max_new_tokens
)
bad_words_list = np.array([[""]], dtype=object)
stop_words_list = np.array([[args.stop_texts]], dtype=object)
# eos token
end_id_list = np.array([[args.stop_token]], dtype=np.int32)
temperature_list = np.array([[0.800000011920929]], dtype=np.float32)
top_k_list = np.array([[0]], dtype=np.int32)
top_p_list = np.array([[0.9]], dtype=np.float32)
streaming = [[True]]
streaming_data = np.array(streaming, dtype=bool)
inputs = [
prepare_tensor("text_input", input0_data, "grpc"),
prepare_tensor("max_tokens", output0_len, "grpc"),
prepare_tensor("bad_words", bad_words_list, "grpc"),
prepare_tensor("stop_words", stop_words_list, "grpc"),
prepare_tensor("end_id", end_id_list, "grpc"),
prepare_tensor("temperature", temperature_list, "grpc"),
prepare_tensor("top_k", top_k_list, "grpc"),
prepare_tensor("top_p", top_p_list, "grpc"),
prepare_tensor("stream", streaming_data, "grpc"),
]
user_data = UserData()
time_start = time.time()
timestamps = []
first_token_time = 0.0
token_latency = 0.0
num_tokens = 0.0
grpc_addr = f"{args.host}:{args.port}/"
with grpcclient.InferenceServerClient(
url=grpc_addr, verbose=False
) as triton_client:
# Establish stream
triton_client.start_stream(callback=partial(callback, user_data))
# Send request
triton_client.async_stream_infer(
model_name, inputs
)
# Parse the responses
while True:
try:
result = user_data._completed_requests.get(timeout=300)
except Exception:
break
if type(result) == InferenceServerException:
print("Received an error from server:")
print(result)
break
else:
ensemble_output = result.as_numpy("text_output")
is_finish = result.get_response().parameters["triton_final_response"].bool_param
num_tokens += 1
timestamps.append(time.time())
if num_tokens == 1:
first_token_time = (
np.round(timestamps[0] - time_start, 3) * 1000
)
ensemble_output = ensemble_output[0].decode("UTF-8")
if num_tokens >= args.max_new_tokens or ensemble_output == args.stop_texts or is_finish:
break
if num_tokens > 1:
token_latency = (
(timestamps[-1] - timestamps[0]) / (num_tokens - 1) * 1000
)
print(f"first_token_time: {first_token_time}")
print(f"token_latency : {token_latency}")
print(f"num_tokens : {num_tokens}")
request_latencys.append(time.time() - start)
step_latencys.append(token_latency)
first_token_latencys.append(first_token_time)
num_completion_tokens.append(num_tokens)
if time.time() - start > 10:
num_timeout_requests += 1
except Exception as e:
num_err_requests += 1
print(f"Error: {str(e)}")
continue
request_latencys.append(time.time() - start)
threads = []
for _ in range(args.concurrent):
thread = threading.Thread(target=thread_fn)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
average_request_latency = sum(request_latencys) / len(request_latencys)
average_first_token_latency = sum(first_token_latencys) / len(first_token_latencys)
average_step_latency = sum(step_latencys) / len(step_latencys)
average_num_completion_tokens = sum(num_completion_tokens) / len(
num_completion_tokens
)
average_qps = (
(1 / average_request_latency)
* args.concurrent
* (1 - (num_err_requests / len(dataset)))
)
print("=====================> Results <=========================")
print(f"Concurrent Nums: {args.concurrent}")
print(f"Request Latency: {average_request_latency} (s)")
print(f"First Token Latency: {average_first_token_latency} (ms)")
print(f"Step Latency: {average_step_latency} (ms)")
print(f"Average Completion_token Nums: {average_num_completion_tokens}")
print(f"QPS: {average_qps}")
print(f"Output Throughputs: {average_num_completion_tokens * average_qps}")
print(f"Error Requests: {(num_err_requests / len(dataset)) * 100}%")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Benchmark the online serving throughput."
)
parser.add_argument("--host", type=str, default="127.0.0.1")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--dataset", type=str, default=None, help="Path to the dataset.")
parser.add_argument("--dataset-name", type=str, default=None, help="dataset name")
parser.add_argument(
"--concurrent", type=int, default=1, help="how many threads will be started"
)
parser.add_argument("--max-new-tokens", type=int, default=512)
parser.add_argument('--stop-texts', type=str, default="<|im_end|>")
parser.add_argument('--stop-token', type=int, default=49954)
args = parser.parse_args()
curr_prompt_index = 0
get_prompt_lock = threading.Lock()
dataset = load_dataset(args.dataset)
request_latencys = []
first_token_latencys = []
step_latencys = []
num_completion_tokens = []
num_err_requests = 0
num_timeout_requests = 0
num_tokens = 0
first_token_time = 0.0
main(args)
The service handles 26 requests simultaneously
The service processes each request in order, processing one request before processing the next one.
ensemble
does not have this phenomenon
I met a similar problem in with decoupled
set to False
.
In the execution body of BLS, the len(requests)
is always 1
, regardless of whether it is executed async or not.
It seems like tensorrt_llm_bls
does not support inflight-batching but only ensemble
do?
I found the cause of the problem. The concurrency is influenced by the default setting of bls_instance_count
(count
of instance_group
) in the tensorrt_llm_bls/config.pbtxt
file, which is initially set to 1.
I found the cause of the problem. The concurrency is influenced by the default setting of
bls_instance_count
(count
ofinstance_group
) in thetensorrt_llm_bls/config.pbtxt
file, which is initially set to 1.
@songyouwei Yes, I also discovered this problem. When I set the count of instance_group to the number of concurrent requests, this problem can be temporarily solved ~
@songyouwei Yes, I also discovered this problem. When I set the count of instance_group to the number of concurrent requests, this problem can be temporarily solved ~
Hi @wjj19950828, may I ask do you set this to the num of GPUs we have? Or the batch size we set?
I found the cause of the problem. The concurrency is influenced by the default setting of
bls_instance_count
(count
ofinstance_group
) in thetensorrt_llm_bls/config.pbtxt
file, which is initially set to 1.
@songyouwei hi, sorry to bother you, but for /v2/models/ensemble/generate_stream
, do we need to change the bls_instance_count
if use decoupled=True
mode?
When I use tensorrt_llm_bls, the first token takes very long time. It looks like the queue is blocked.
Use tensorrt_llm and ensemle didn't encounter this problem
How should I troubleshoot this? Thank you~
max_batch_size 128 Use multiple threads to send requests concurrently