THUDM / ChatGLM-6B

ChatGLM-6B: An Open Bilingual Dialogue Language Model | 开源双语对话语言模型
Apache License 2.0
40.46k stars 5.19k forks source link

[Help] tornado怎么做非阻塞的流式接口 #1198

Open aiaiyueq11 opened 1 year ago

aiaiyueq11 commented 1 year ago

Is there an existing issue for this?

Current Behavior

以下是我用tornado实现的流式接口,但是当我调用generate接口去生成结果时整个服务就阻塞了,导致我同时请求另外一个other接口一直在等待。请问应该如何实现非阻塞的流式接口

class ChatHandler(BaseHandler): executor = ThreadPoolExecutor(5)

@tornado.gen.coroutine
def post(self, *args, **kwargs):
    body = self.request.body.decode('utf-8')
    data = json.loads(body)

    query = data.get('query', '')
    history = data.get('history', [])
    temperature = data.get('temperature', 0.5)
    top_p = data.get('top_p', 0.9)
    max_length = data.get('max_length', 2048)
    do_sample = data.get('do_sample', True)

    self.set_header('Content-Type', 'text/event-stream')
    for response, _ in service.model.model.stream_chat(service.model.tokenizer,
                                                           query,
                                                           history,
                                                           max_length=max_length,
                                                           do_sample=do_sample,
                                                           top_p=top_p,
                                                           temperature=temperature):
        info_ = json.dumps({"response": response}, ensure_ascii=False)
        self.write("data: %s\n\n" % info_)
        yield self.flush()
    self.finish()

class CheckHandler(BaseHandler): def get(self, *args, **kwargs): self.finish("")

application = tornado.web.Application([ (r"/other", CheckHandler), (r"/generate", ChatHandler) ])

if name == 'main': service = ModelService() port = 8080 application.listen(port) tornado.ioloop.IOLoop.instance().start()

Expected Behavior

No response

Steps To Reproduce

请问应该如何实现非阻塞的流式接口

Environment

- OS:mac
- Python:3.7

Anything else?

No response

MrJiangZhongZheng commented 2 months ago

试试这个 亲测可以 用流式的api from openai import AsyncOpenAI client_stream = AsyncOpenAI()

class ServerSentEvent(tornado.web.RequestHandler): executor = ThreadPoolExecutor(10)

async def post(self, *args, **kwargs):
    ret = {
        "ret": -1,
        "errcode": -1,
        "data": ""
    }

    try:
        data = json_decode(self.request.body)
        content = data.get("msg", "")
        content = json.loads(content)
        model = data.get("model", "")
        temperature = data.get("temperature", 0.1)

        if not model:
            model = "gpt-3.5-turbo"

        stream = await client_stream.chat.completions.create(
            model=model,
            messages=content,
            stream=True,
            frequency_penalty=1.0,
            temperature=temperature,
            max_tokens=4096,
        )
        tempLine = ""
        use1 = 0
        use2 = 0
        for msg in content:
            use1 += len(msg["content"])

        async for chunk in stream:
            tempSplit = []
            if chunk.choices[0].delta.content is not None:
                temp = chunk.choices[0].delta.content
                use2 += len(temp)
                tempLine += temp
                if "\n" in temp:
                    tempSplit = tempLine.split("\n")
                else:
                    continue
                if len(tempLine) <= 1:
                    continue
                # 最后一个
                tempLine = tempSplit[len(tempSplit) - 1]

                for index in range(len(tempSplit) - 1):
                    # line = tempSplit[index] + "\n"
                    line = tempSplit[index]
                    # logging.info(line)
                    line = json.dumps({"data": line})
                    self.write("data:%s\n" % line)
                    await self.flush()
        tempSplit = tempLine.split("\n")
        for i in range(len(tempSplit)):
            line = tempSplit[i]
            # if i != len(tempSplit) - 1:
            #     line += "\n"
            line = json.dumps({"data": line})
            self.write("data:%s\n" % line)
            await self.flush()
            # logging.info(line)
        logging.info("ppppppppppp:model:%s, use:%s:%s" % (model, use1, use2))
        await self.finish()
    except Exception as e:
        print(e)
        self.write(ret)
        await self.finish()