BerriAI / litellm

Python SDK, Proxy Server (LLM Gateway) to call 100+ LLM APIs in OpenAI format - [Bedrock, Azure, OpenAI, VertexAI, Cohere, Anthropic, Sagemaker, HuggingFace, Replicate, Groq]
https://docs.litellm.ai/docs/
Other
12.84k stars 1.5k forks source link

[Bug]: acompletions not triggered when asyncio.sleep() is run #1034

Open anunayajoshi opened 10 months ago

anunayajoshi commented 10 months ago

What happened?

I have a script that I pasted at the bottom of this issue, you can use to verify this. Thing to note is when the HTTP POST request to openAI is triggered.

Here are my outputs for the run_multiple_acompletions function


i am here <class 'coroutine'>
i am here <class 'coroutine'>
<class '_asyncio.Task'>
status of task False
<class '_asyncio.Task'>
status of task True
<class 'async_generator'>
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Hi
 there
!
 How
 can
 I
 assist
 you
 today
?
None

which only does the POST request to openAI when i run task.result(), and not during asyncio.sleep()

whereas when i use openai directly using the other function with openai directly,

task created <class '_asyncio.Task'>
task created <class '_asyncio.Task'>
status of task False
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
status of task True
<class 'openai.AsyncStream'>

Hello
!
 How
 can
 I
 assist
 you
 today
?

it does the post requests, during the sleep.

Using acompletions, I am missing out on the intended latency saved by running my call asyncronously.

Script:

from litellm import acompletion
import os
from dotenv import load_dotenv
from openai import AsyncOpenAI

load_dotenv()
import asyncio

import logging

logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger()

async def run_multiple_openai_completions():
    tasks = []
    messages = [
        {"content": "Hello", "role": "user"},
        {"content": "How are you?", "role": "user"},
    ]
    client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    for message in messages:
        chat_completion_coro = client.chat.completions.create(
            messages=[message], model="gpt-3.5-turbo", stream=True
        )
        task = asyncio.create_task(chat_completion_coro)
        print("task created", type(task))
        tasks.append(task)

    print("status of task", tasks[0].done())
    await asyncio.sleep(4)
    print("status of task", tasks[0].done())
    res = tasks[0].result()
    print(type(res))

    async for chunk in res:
        print(chunk.choices[0].delta.content)

async def run_multiple_acompletions():
    try:

        async def agenerate_dialogue(chat_history):
            return await acompletion(
                model="gpt-3.5-turbo",
                messages=[{"content": "Hello", "role": "user"}],
                stream=True,
            )

        tasks = []
        messages = [
            {"content": "Hello", "role": "user"},
            {"content": "How are you?", "role": "user"},
            # Add more messages as needed
        ]

        for message in messages:
            # Create tasks for each acompletion call
            coro = acompletion(
                model="gpt-3.5-turbo",
                messages=[{"content": "Hello", "role": "user"}],
                stream=True,
            )
            # coro = agenerate_dialogue([{"content": "Hello", "role": "user"}])
            print("i am here", type(coro))
            task = asyncio.create_task(coro)
            tasks.append(task)

        # responses = await asyncio.gather(*tasks, return_exceptions=True)

        first_task = tasks[0]

        print(type(first_task))
        print("status of task", tasks[0].done())
        if isinstance(first_task, Exception):
            print(f"Error occurred: {first_task}")
        else:
            await asyncio.sleep(5)
            print(type(first_task))
            print("status of task", first_task.done())
            res = first_task.result()
            print(type(res))
            # res = await res # these commented lines are to be run with the coro = agenerate_doalogue function that is commented out above 
            # print(type(res)) 

            async for chunk in res:
                print(chunk.choices[0].delta.content)
    except Exception as e:
        print(f"Error occurred: {e}")

# To run the multiple completions concurrently:
asyncio.run(run_multiple_acompletions())

Relevant log output

i am here <class 'coroutine'>
i am here <class 'coroutine'>
<class '_asyncio.Task'>
status of task False
<class '_asyncio.Task'>
status of task True
<class 'async_generator'>
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Hi
 there
!
 How
 can
 I
 assist
 you
 today
?
None

task created <class '_asyncio.Task'>
task created <class '_asyncio.Task'>
status of task False
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
status of task True
<class 'openai.AsyncStream'>

Hello
!
 How
 can
 I
 assist
 you
 today
?

Twitter / LinkedIn details

No response

krrishdholakia commented 10 months ago

@anunayajoshi happy to run the script and check what's happening. Your testing doesn't seem to imply the async calls aren't being made at all- just not run when asyncio.sleep() occurs?

Renaming the issue to more precisely describe the problem.

For context, acompletions does make async calls. Here's an example of the load test script we run locally https://github.com/BerriAI/litellm/blob/37251d327d4b0fe564fd8d7223d5d96cde7752b1/litellm/tests/test_profiling_router.py#L4

anunayajoshi commented 10 months ago

Yup, apologise for the confusion. Reason I used asyncio.sleep() was to figure out when the POST requests were happening. Issue more specifically, is that the API calls to OpenAI don't happen during the asyncio.sleep, which I imagine means that they don't happen in the background while other tasks are run? Is it my assumption false?

Also apologise if the 2 test script functions isn't exactly the same, was testing out a bunch of stuff, but i think the output logs do prove what i mean.

krrishdholakia commented 10 months ago

We're using async openai as well. I suspect this has to do with where the client is being initialized (if no client is passed in, litellm initializes it when .completion() is called).

Is this issue also present when using litellm.Router()? @anunayajoshi https://docs.litellm.ai/docs/routing

I'll try and get to this today/tomorrow.

anunayajoshi commented 10 months ago

I have not tried using the router yet, will update you once I try it. am a bit caught up with other work atm, so might take a while