openai / openai-python

The official Python library for the OpenAI API
https://pypi.org/project/openai/
Apache License 2.0
22.91k stars 3.21k forks source link

The concurrency of AsyncOpenAI cannot be fully utilized. #1725

Open rangehow opened 1 month ago

rangehow commented 1 month ago

Confirm this is an issue with the Python library and not an underlying OpenAI API

Describe the bug

I attempted to complete a stability test on the concurrency of AsyncOpenAI. I set the concurrency to 1024 but found that it kept running at a very low average level in a jittery manner, which has been consistent with my production test results.

image

To Reproduce

I put my code in three part. client.py server.py and main.py(used to create 100k client total)

server.py

from fastapi import FastAPI, Request
from pydantic import BaseModel
import asyncio
import logging
from datetime import datetime
import threading
import time
import csv

app = FastAPI()

# track current activate queue count
active_requests = 0

# debug file to draw pic
output_file = 'active_requests_log.csv'

class CompletionRequest(BaseModel):
    model: str
    messages: list
    temperature: float

@app.middleware("http")
async def track_requests(request: Request, call_next):
    global active_requests
    active_requests += 1  # add count when get request
    logging.info(f"Active requests: {active_requests}")

    response = await call_next(request)

    active_requests -= 1  # 请求完成后减少计数
    logging.info(f"Active requests: {active_requests}")

    return response

@app.post("/v1/chat/completions")
async def completions(request: CompletionRequest):
    await asyncio.sleep(1)  # mock llm generate latency
    return {
        "choices": [
            {"message": {"content": f"Response to {request.messages[-1]['content']}"}}
        ]
    }

def record_active_requests():
    """ save log file per second"""
    global active_requests
    with open(output_file, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["timestamp", "active_requests"])  # 写表头

        while True:
            # 每秒记录一次
            current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            writer.writerow([current_time, active_requests])
            file.flush()  # 确保每秒写入数据到文件
            time.sleep(1)

# 启动一个线程来记录活跃请求数
threading.Thread(target=record_active_requests, daemon=True).start()

if __name__ == "__main__":
    import uvicorn
    logging.basicConfig(level=logging.INFO)
    uvicorn.run(app, host="127.0.0.1", port=8203)

client.py

import asyncio
from functools import wraps
import httpx
import logging
from openai import AsyncOpenAI

def limit_async_func_call(max_size: int):
    sem = asyncio.Semaphore(max_size)

    def final_decro(func):
        @wraps(func)
        async def wait_func(*args, **kwargs):
            async with sem:
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    logging.error(f"Exception in {func.__name__}: {e}")

        return wait_func
    return final_decro

# 假设这个是你要进行并发测试的函数
@limit_async_func_call(max_size=1024)  # 限制并发为1024
async def custom_model_if_cache(prompt, system_prompt=None, history_messages=[], **kwargs):
    custom_http_client = httpx.AsyncClient(
        limits=httpx.Limits(max_connections=2048, max_keepalive_connections=1024),
        timeout=httpx.Timeout(timeout=None)
    )

    openai_async_client = AsyncOpenAI(
        api_key="EMPTY", base_url="http://localhost:8203/v1",  # 模拟本地 server
        http_client=custom_http_client
    )

    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.extend(history_messages)
    messages.append({"role": "user", "content": prompt})

    # 假设这里是要调用的外部 API
    response = await openai_async_client.chat.completions.create(
        model="gpt-3.5-turbo", messages=messages, temperature=0, **kwargs
    )

    return "hi"

main.py

import asyncio
import logging
from client import custom_model_if_cache
# 模拟 10 万个请求
TOTAL_REQUESTS = 100000

async def simulate_requests():
    tasks = []
    for i in range(TOTAL_REQUESTS):
        prompt = f"Test prompt {i}"  # 每次请求的不同参数
        task = custom_model_if_cache(prompt=prompt)  # 调用受限的异步函数
        tasks.append(task)

    # 并发执行所有请求
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 打印前10个结果以验证
    for result in results[:10]:
        print(result)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(simulate_requests())

To reproduce, open two terminal and run python server.py python main.py seperately. I also save the log, you can use following code to draw:

draw.py

import csv
import matplotlib.pyplot as plt
from datetime import datetime

# 文件路径
input_file = 'active_requests_log.csv'

# 读取 CSV 文件并解析时间和活跃请求数量
timestamps = []
active_requests = []

with open(input_file, mode='r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        timestamps.append(datetime.strptime(row["timestamp"], "%Y-%m-%d %H:%M:%S"))
        active_requests.append(int(row["active_requests"]))

# 绘制图表
plt.figure(figsize=(10, 6))
plt.plot(timestamps, active_requests, label='Active Requests', color='b')

# 设置图表标题和标签
plt.title('Active Requests Over Time')
plt.xlabel('Time')
plt.ylabel('Active Requests')
plt.xticks(rotation=45)
plt.grid(True)
plt.legend()

# 显示图表
plt.tight_layout()
plt.savefig("/mnt/rangehow/pr/test_c/c.jpg")

Code snippets

No response

OS

ubuntu

Python version

3.12

Library version

latest

RobertCraigie commented 1 month ago

Thanks for the report, what results do you get if you extract your custom_http_client & openai_async_client outside of the async function call so they're singletons?

rangehow commented 1 month ago

Thanks for the report, what results do you get if you extract your custom_http_client & openai_async_client outside of the async function call so they're singletons?

Do you mean this? client.py

import asyncio
from functools import wraps
import httpx
import logging
from openai import AsyncOpenAI

# 限制并发请求的装饰器
def limit_async_func_call(max_size: int):
    sem = asyncio.Semaphore(max_size)

    def final_decro(func):
        @wraps(func)
        async def wait_func(*args, **kwargs):
            async with sem:
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    logging.error(f"Exception in {func.__name__}: {e}")

        return wait_func
    return final_decro

custom_http_client = httpx.AsyncClient(
    limits=httpx.Limits(max_connections=2048, max_keepalive_connections=1024),
    timeout=httpx.Timeout(timeout=None)
)

openai_async_client = AsyncOpenAI(
    api_key="EMPTY", base_url="http://localhost:8203/v1",  # 模拟本地 server
    http_client=custom_http_client
)

# 假设这个是你要进行并发测试的函数
@limit_async_func_call(max_size=1024)  # 限制并发为1024
async def custom_model_if_cache(prompt, system_prompt=None, history_messages=[], **kwargs):

    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.extend(history_messages)
    messages.append({"role": "user", "content": prompt})

    # 假设这里是要调用的外部 API
    response = await openai_async_client.chat.completions.create(
        model="gpt-3.5-turbo", messages=messages, temperature=0, **kwargs
    )

    return "hi"
RobertCraigie commented 1 month ago

yes!

rangehow commented 1 month ago

I didn’t complete the entire run, but I think the result should still be the same as last time.

image

RobertCraigie commented 1 month ago

thanks, does this still happen if you just use httpx to make the requests instead of the openai SDK?

rangehow commented 1 month ago

thanks, does this still happen if you just use httpx to make the requests instead of the openai SDK?

Honestly, I don’t really understand network programming—it’s a bit beyond my skill set. If you could clearly tell me how the code should be changed (or even better, provide me with a modified version), I can quickly test it out! 😊

rangehow commented 1 month ago

Although the concurrency didn’t hit the full load of 1024, it seems that the singleton operations have clearly increased the overall concurrency! image

RobertCraigie commented 1 month ago

Of course! Here's what that code should look like (I haven't verified it)

http_client = httpx.AsyncClient(
    limits=httpx.Limits(max_connections=2048, max_keepalive_connections=1024),
    timeout=httpx.Timeout(timeout=None)
)
http_client.post(
    "http://localhost:8203/v1/chat/completions",
    json=dict(model="gpt-3.5-turbo", messages=messages, temperature=0, **kwargs),
)
rangehow commented 1 month ago

I assume code should be like below in client.py

http_client = httpx.AsyncClient(
    limits=httpx.Limits(max_connections=2048, max_keepalive_connections=1024),
    timeout=httpx.Timeout(timeout=None)
)

@limit_async_func_call(max_size=1024)  # 限制并发为1024
async def custom_httpx(prompt, system_prompt=None, history_messages=[], **kwargs):

    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.extend(history_messages)
    messages.append({"role": "user", "content": prompt})

    response = await http_client.post(
        "http://localhost:8203/v1/chat/completions",
        json=dict(model="gpt-3.5-turbo", messages=messages, temperature=0, **kwargs),
    )

    return "hi"

The phenomenon I observed today is completely different from yesterday — whether using httpx or a singleton OpenAI API, there has been a significant drop in concurrency compared to the tests conducted yesterday. I need to run for longer to get a result.

not sure if the following message would help

ss -s
Total: 9464
TCP:   13509 (estab 3444, closed 9117, orphaned 10, timewait 5130)

Transport Total     IP        IPv6
RAW       7         2         5        
UDP       5         5         0        
TCP       4392      4361      31       
INET      4404      4368      36       
FRAG      0         0         0 
RobertCraigie commented 1 month ago

Interesting, so you're getting similar results with the SDK and with httpx?

rangehow commented 1 month ago

I just tested two from scratch OpenAI Async API: image

HTTPX: (I got distracted and didn’t notice it had been running for quite a while.) image