InternLM / lmdeploy

LMDeploy is a toolkit for compressing, deploying, and serving LLMs.
https://lmdeploy.readthedocs.io/en/latest/
Apache License 2.0
4.58k stars 418 forks source link

[Feature] 有非异步的engine吗? #1346

Closed janelu9 closed 7 months ago

janelu9 commented 7 months ago

Motivation

异步推理不适用

Related resources

No response

Additional context

No response

AllentDan commented 7 months ago

AsyncEngine 有提供 batch_infer 的接口,不能用吗

janelu9 commented 7 months ago

AsyncEngine 有提供 batch_infer 的接口,不能用吗

高并发时候报错了:

File /mnt/e/conda-py311-cu118-torch21/lib/python3.11/site-packages/lmdeploy/serve/async_engine.py:410, in AsyncEngine.batch_infer(self, prompts, gen_config, do_preprocess, **kwargs)
    404     async def gather():
    405         await asyncio.gather(*[
    406             _inner_call(i, generators[i])
    407             for i in range(len(batch_prompts))
    408         ])
--> 410     self.loop.run_until_complete(gather())
    411 outputs = outputs[0] if need_list_wrap else outputs
    412 return outputs

File /mnt/e/conda-py311-cu118-torch21/lib/python3.11/asyncio/base_events.py:629, in BaseEventLoop.run_until_complete(self, future)
    618 """Run until the Future is done.
    619 
    620 If the argument is a coroutine, it is wrapped in a Task.
   (...)
    626 Return the Future's result, or raise its exception.
    627 """
    628 self._check_closed()
--> 629 self._check_running()
    631 new_task = not futures.isfuture(future)
    632 future = tasks.ensure_future(future, loop=self)

File /mnt/e/conda-py311-cu118-torch21/lib/python3.11/asyncio/base_events.py:588, in BaseEventLoop._check_running(self)
    586 def _check_running(self):
    587     if self.is_running():
--> 588         raise RuntimeError('This event loop is already running')
    589     if events._get_running_loop() is not None:
    590         raise RuntimeError(
    591             'Cannot run the event loop while another loop is running')

RuntimeError: This event loop is already running
janelu9 commented 7 months ago

AsyncEngine 有提供 batch_infer 的接口,不能用吗

不行,不是批量是逐笔的实时调用,有类似vllm的engine吗 不要async的

AllentDan commented 7 months ago

https://github.com/InternLM/lmdeploy/blob/9c954efe0a0c939c4741515f929009bced443461/lmdeploy/serve/async_engine.py#L412 这里改成

asyncio.new_event_loop().run_until_complete(gather())
AllentDan commented 7 months ago

描述下你的调用场景,给个使用的脚本

janelu9 commented 7 months ago

描述下你的调用场景,给个使用的脚本

可以理解为不间断地多进程调用一个pipe,希望逐个直接返回

AllentDan commented 7 months ago

不行,pipe 不能跨进程调用。要么你每个进程启动一个 pipe,这非常浪费资源。如果你是想要多线程地调用,倒是可以参考

import asyncio
from lmdeploy import pipeline
import random
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

pipe = pipeline('internlm/internlm2-chat-7b', log_level='ERROR')
num_parallel = 32
questions = ['你是谁']*100

async def get_response(question):
    out = ''
    async for res in pipe.generate(question, random.randint(1, 100000)):
        out += res.response
    return out

def process_one(question):
    loop = asyncio.new_event_loop()
    response = loop.run_until_complete(get_response(question))
    return response

with ThreadPoolExecutor(max_workers=num_parallel) as executor:
    for response in tqdm(executor.map(process_one, questions)):
        print(response)
print("Done")
janelu9 commented 7 months ago

不行,pipe 不能跨进程调用。要么你每个进程启动一个 pipe,这非常浪费资源。如果你是想要多线程地调用,倒是可以参考

import asyncio
from lmdeploy import pipeline
import random
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

pipe = pipeline('internlm/internlm2-chat-7b', log_level='ERROR')
num_parallel = 32
questions = ['你是谁']*100

async def get_response(question):
    out = ''
    async for res in pipe.generate(question, random.randint(1, 100000)):
        out += res.response
    return out

def process_one(question):
    loop = asyncio.new_event_loop()
    response = loop.run_until_complete(get_response(question))
    return response

with ThreadPoolExecutor(max_workers=num_parallel) as executor:
    for response in tqdm(executor.map(process_one, questions)):
        print(response)
print("Done")

是远端多并发调用服务端的一个pipe,出现了1#的错误

janelu9 commented 7 months ago

不行,pipe 不能跨进程调用。要么你每个进程启动一个 pipe,这非常浪费资源。如果你是想要多线程地调用,倒是可以参考

import asyncio
from lmdeploy import pipeline
import random
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

pipe = pipeline('internlm/internlm2-chat-7b', log_level='ERROR')
num_parallel = 32
questions = ['你是谁']*100

async def get_response(question):
    out = ''
    async for res in pipe.generate(question, random.randint(1, 100000)):
        out += res.response
    return out

def process_one(question):
    loop = asyncio.new_event_loop()
    response = loop.run_until_complete(get_response(question))
    return response

with ThreadPoolExecutor(max_workers=num_parallel) as executor:
    for response in tqdm(executor.map(process_one, questions)):
        print(response)
print("Done")

你这个demo里面 generate的session_id万一重复会出问题吗?

janelu9 commented 7 months ago
async def get_response(question):
    out = ''
    async for res in pipe.generate(question, random.randint(1, 100000)):
        out += res.response
    return out

这方案似乎可以,但是当输出token为None时貌似有个小bug

微信图片_20240326164926
AllentDan commented 7 months ago

可以就用吧,None 的情况过滤一下就行。我们后面会考虑加普通的 generate 函数接口。但是如果用户自己写服务的话,建议可以直接调用 TurbomindInstance.stream_infer 接口

janelu9 commented 7 months ago

TurbomindInstance.stream_infer

我们不是stream的是输出全文后送出

AllentDan commented 7 months ago

TurbomindInstance.stream_infer

我们不是stream的是输出全文后送出

这个接口是有是否 stream 的参数的。另外 stream 返回也没啥,外面加个字符串累加,返回最后结果就行。

janelu9 commented 7 months ago

不行,pipe 不能跨进程调用。要么你每个进程启动一个 pipe,这非常浪费资源。如果你是想要多线程地调用,倒是可以参考

import asyncio
from lmdeploy import pipeline
import random
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

pipe = pipeline('internlm/internlm2-chat-7b', log_level='ERROR')
num_parallel = 32
questions = ['你是谁']*100

async def get_response(question):
    out = ''
    async for res in pipe.generate(question, random.randint(1, 100000)):
        out += res.response
    return out

def process_one(question):
    loop = asyncio.new_event_loop()
    response = loop.run_until_complete(get_response(question))
    return response

with ThreadPoolExecutor(max_workers=num_parallel) as executor:
    for response in tqdm(executor.map(process_one, questions)):
        print(response)
print("Done")

这个方法在第二次调用时候,server就报错说 engine main loop stopped 咋回事?

AllentDan commented 7 months ago

不知道,目前为止不知道你的应用场景,最好有个脚本可以直接复现问题。或者考虑下给 AsyncEngine 写个普通的 generate 函数就行,里面调用的接口async_stream_infer 改成 stream_infer

janelu9 commented 7 months ago

不知道,目前为止不知道你的应用场景,最好有个脚本可以直接复现问题。或者考虑下给 AsyncEngine 写个普通的 generate 函数就行,里面调用的接口async_stream_infer 改成 stream_infer

在jupyter里面多次直接运行async for不会出问题,但是一旦把这个循环封装成函数就会报“This event loop is already running”。 封装成函数,如果用flask写成服务,第一次调用可以成功,第二次就会报engine main loop stopped

AllentDan commented 7 months ago

最好有个复现脚本或者代码,没有更多的意见暂时。

janelu9 commented 7 months ago

最好有个复现脚本或者代码,没有更多的意见暂时。

就是用flask写server不行 用fastapi可以 不支持同步

AllentDan commented 7 months ago

最好有个复现脚本或者代码,没有更多的意见暂时。

就是用flask写server不行 用fastapi可以 不支持同步

没用过flask,可以给个脚本,或者等我有空了尝试下 flask 方式写server

janelu9 commented 7 months ago

最好有个复现脚本或者代码,没有更多的意见暂时。

就是用flask写server不行 用fastapi可以 不支持同步

没用过flask,可以给个脚本,或者等我有空了尝试下 flask 方式写server

from flask import Flask, request
from lmdeploy import pipeline, TurbomindEngineConfig,PytorchEngineConfig,GenerationConfig
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.security.http import HTTPAuthorizationCredentials, HTTPBearer
import json

# pipe = pipeline('openlm-research/open_llama_13b')
# gen_config=GenerationConfig(**{'max_new_tokens':12})

# async def f():
    # out=''
    # async for res in pipe.generate([{'role':'user','content':"你好"}], 0,gen_config =gen_config):
        # out += res.response
    # return out

app = Flask(__name__)
#app = FastAPI()
@app.route('/test', methods=['POST'])
def handle_json():
    data = request.json # 获取 JSON 数据
    #r = await f()
    return json.dumps(data)

if __name__ == '__main__':
    app.run(host="0.0.0.0",port=6520)
curl -X POST -H "Content-Type: application/json" -d '{"key": "value"}' http://localhost:6520/test