Closed EvanSong77 closed 3 weeks ago
这个只是我个人实验(写着玩的)项目,快速测试各种想法,你可以试试https://github.com/noooop/vllm/tree/wde_encode_only 分支,测试要充分一些
我过几天会写一个data parallelism Executor,你先别急
使用double_buffer_execute_loop,单卡推理性能应该不错吧
我过几天会写一个data parallelism Executor,你先别急
好的 非常感谢
使用double_buffer_execute_loop,单卡推理性能应该不错吧
嗯嗯 性能很不错,之前用的onnx,提升很小
试试这个新提交
非常感谢您的提交,我测了一下,并发调度的时候还是会存在卡住的情况,而且貌似会多次(data_parallel_size)加载模型,正常来说并发调度的时候,在处理不过来的时候会进行排队。
解决多次模型加载比较麻烦,先加载到cpu里面,然后拷贝到对应的设备上?还是先加载在cuda:0 里面在拷贝,这些都会导致在worker外面初始化对应的设备(没有在worker线程初始化对应的设备,而是在主线程初始化对应设备),比较头疼。
卡住是启动时卡住,还是中途速度忽快忽慢,还是不能正确关闭
我提交了一个补丁,你再试试最新提交呢,vllm分布式都耦合在一起,特别头疼
我提交了一个补丁,你再试试最新提交呢,vllm分布式都耦合在一起,特别头疼
from vllm.wde.entrypoints.llm import LLM
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
pairs = []
query = datas['query']
for data in datas['contexts']:
pairs.append([query, data])
pairs_list = [pairs] * 2
llm = LLM(model='../bge-reranker-v2-m3/', data_parallel_size=4)
def process_batch(batch, index):
"""对每个批次进行推理,并返回结果和索引"""
result = llm.encode(batch)
return index, result
results = [None] * len(pairs_list)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(process_batch, batch, idx) for idx, batch in enumerate(pairs_list)]
for future in as_completed(futures):
index, result = future.result()
results[index] = result
for i, outputs in enumerate(results):
for output in outputs:
print(output.score)
非常感谢,试了一下,还是不行
内部已经使用了ThreadPoolExecutor,你不用自己启动多线程
from vllm.wde.entrypoints.llm import LLM
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
llm = LLM(model='BAAI/bge-m3', data_parallel_size=4)
outputs = llm.encode(prompts)
for output in outputs:
print(output.outputs.shape)
这样就可以了
内部已经使用了ThreadPoolExecutor,你不用自己启动多线程
from vllm.wde.entrypoints.llm import LLM prompts = [ "Hello, my name is", "The president of the United States is", "The capital of France is", "The future of AI is", ] llm = LLM(model='BAAI/bge-m3', data_parallel_size=4) outputs = llm.encode(prompts) for output in outputs: print(output.outputs.shape)
这样就可以了
app.py
import logging
import re
import sys
import time
import flask
from flask import request, jsonify
from vllm.wde.entrypoints.llm import LLM
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
app = flask.Flask("reranker-service")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("[%(asctime)s] %(levelname)s in %(module)s: %(message)s"))
app.logger.handlers.clear()
app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO)
llm = LLM(model='../bge-reranker-v2-m3/', data_parallel_size=4)
@app.route('/reranker', methods=['POST'])
def reranker_proc():
res = {"success": False, "msg": None, "result": None}
try:
req_data = request.json
clean = lambda _str: re.sub('\s+', ' ', _str.replace('\t', ''))
pairs = [[req_data['query'], clean(_s)] for _s in req_data['contexts']]
s_t = time.time()
result = llm.encode(pairs)
app.logger.info(time.time() - s_t)
scores = [output.score for output in result]
res['result'] = scores
res['success'] = True
except Exception as e:
app.logger.error(str(sys.exc_info()[1]))
res['msg'] = str(sys.exc_info()[1])
return jsonify(res)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=12501, threaded=True, debug=False)
client.py
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
with open('datas.json', 'r', encoding='utf-8') as fp:
datas = json.load(fp)
datas['contexts'] = datas['contexts'] * 20
pairs_list = [datas] * 2
def post_request(data, index):
"""发送POST请求并返回响应"""
response = requests.post('http://0.0.0.0:12501/reranker', json=data)
return index, response.json()
results = [None] * len(pairs_list)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(post_request, batch, idx) for idx, batch in enumerate(pairs_list)]
for future in as_completed(futures):
index, result = future.result()
results[index] = result['result']
# 按顺序输出结果
for i, outputs in enumerate(results):
print(f"Results for batch {i}:")
for output in outputs:
print(output.score)
/usr/local/lib/python3.11/dist-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from .autonotebook import tqdm as notebook_tqdm
WARNING 09-28 17:34:13 _custom_ops.py:18] Failed to import from vllm._C with ModuleNotFoundError("No module named 'vllm._C'")
2024-09-28 17:34:15,155 INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
/hy-tmp/codewen_workspace/vllm/vllm/connections.py:8: RuntimeWarning: Failed to read commit hash:
No module named 'vllm.commit_id'
from vllm.version import __version__ as VLLM_VERSION
INFO 09-28 17:34:15 config.py:558] Downcasting torch.float32 to torch.float16.
INFO 09-28 17:34:15 config.py:28] Initializing an Encode Only engine (v0.6.1.post1) with config: model='../bge-reranker-v2-m3/', tokenizer='../bge-reranker-v2-m3/', tokenizer_mode=auto, trust_remote_code=False, dtype=torch.float16, max_seq_len=8194, download_dir=None, load_format=LoadFormat.AUTO, device_config=cuda, served_model_name=../bge-reranker-v2-m3/, max_num_on_the_fly=3, scheduling=async)
INFO 09-28 17:34:15 config.py:45] Parallel config: data_parallel_size=4
INFO 09-28 17:34:15 llm_engine.py:107] Use async scheduling
INFO 09-28 17:34:15 selector.py:64] Using FLASH ATTN backend.
* Serving Flask app 'reranker-service'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:12501
* Running on http://172.17.0.3:12501/
Press CTRL+C to quit
Processed prompts: 0%| | 0/19 [00:00<?, ?it/s, est. speed input: 0.00 toks/s, output: 0.00 toks/s]
[2024-09-28 17:34:30,289] ERROR in 3662770184: deque index out of range
Processed prompts: 0%| | 0/38 [00:02<?, ?it/s, est. speed input: 0.00 toks/s, output: 0.00 toks/s]
127.0.0.1 - - [28/Sep/2024 17:34:30] "POST /reranker HTTP/1.1" 200 -
INFO 09-28 17:34:30 model_runner.py:39] Starting to load model ../bge-reranker-v2-m3/...
INFO 09-28 17:34:30 model_runner.py:39] Starting to load model ../bge-reranker-v2-m3/...
INFO 09-28 17:34:30 model_runner.py:39] Starting to load model ../bge-reranker-v2-m3/...
INFO 09-28 17:34:30 model_runner.py:39] Starting to load model ../bge-reranker-v2-m3/...
Loading safetensors checkpoint shards: 0% Completed | 0/1 [00:00<?, ?it/s]
Loading safetensors checkpoint shards: 0% Completed | 0/1 [00:00<?, ?it/s]
Loading safetensors checkpoint shards: 0% Completed | 0/1 [00:00<?, ?it/s]
Loading safetensors checkpoint shards: 0% Completed | 0/1 [00:00<?, ?it/s]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:03<00:00, 3.34s/it]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:03<00:00, 3.35s/it]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:03<00:00, 3.34s/it]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:03<00:00, 3.34s/it]
INFO 09-28 17:34:34 model_runner.py:52] Loading model weights took 4.2301 GB
INFO 09-28 17:34:34 model_runner.py:52] Loading model weights took 4.2301 GB
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:03<00:00, 3.60s/it]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:03<00:00, 3.76s/it]
INFO 09-28 17:34:34 model_runner.py:52] Loading model weights took 4.2302 GB
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:04<00:00, 4.55s/it]
Loading safetensors checkpoint shards: 100% Completed | 1/1 [00:04<00:00, 4.56s/it]
INFO 09-28 17:34:35 model_runner.py:52] Loading model weights took 4.2941 GB
Processed prompts: 5%|▌ | 1/19 [00:07<02:18, 7.71s/it, est. speed input: 0.00 toks/s, output: 0.00 toks/s]
您可能没理解我的意思,这种在单线程调用的时候是OK的,但是一旦多线程就卡住了。
webserver 要等到代码完全和进去才能用
具体的要合并到MQLLMEngine才能连上vllm entrypoints, 才能有openai api
webserver 要等到代码完全和进去才能用
具体的要合并到MQLLMEngine才能连上vllm entrypoints, 才能有openai api
明白了,请问您目前有计划对代码进行整合合并吗
明白了,请问您目前有计划对代码进行整合合并吗 头疼啊,估计合不进去 可以在这里围观 https://github.com/vllm-project/vllm/pull/8964
过几天可能会写一个带webserver 的库
过几天可能会写一个带webserver 的库
https://github.com/noooop/wde/blob/main/docs/quickstart.md 你可以试试
过几天可能会写一个带webserver 的库
https://github.com/noooop/wde/blob/main/docs/quickstart.md 你可以试试
采用webserver部署的时候报错
Traceback (most recent call last):
File "/usr/local/miniconda3/bin/wde", line 8, in
wde deploy 是将模型部署命令提交到 server, 所以要保持另外一个窗口的 wde server 一直运行
你可以试试今天新加的 serving 命令 serving 命令相当于 server + deploy 命令,启动服务并部署模型
wde serving examples/online/deploy.yml
当然你需要重装一下最新的代码
pip install https://github.com/noooop/wde/archive/refs/heads/main.zip
llm = LLM(model="./bge-reranker-v2-m3/")
def process_pair(pair): return llm.reranker([pair])
start = time.time() * 1000
with ThreadPoolExecutor(max_workers=8) as executor:
outputs = list(executor.map(process_pair, pairs))
end = time.time() * 1000 print(end-start)
output_scores = [out[0].score for out in outputs]