BerriAI / litellm

Python SDK, Proxy Server to call 100+ LLM APIs using the OpenAI format - [Bedrock, Azure, OpenAI, VertexAI, Cohere, Anthropic, Sagemaker, HuggingFace, Replicate, Groq]
https://docs.litellm.ai/docs/
Other
12.19k stars 1.42k forks source link

[Bug]: Hot to async stream a proxy response with CustomLLM? #5147

Closed Netzvamp closed 1 month ago

Netzvamp commented 1 month ago

What happened?

I'm trying to build a CustomLLM Proxy, so that i can intercept the LLM answers and do stuff with it. Without streaming it runs fine, but with async streaming i don't get it to stream the chunks. I don't found something about it in the docs. Here some code:


from typing import Iterator, AsyncIterator
import litellm
from litellm import CustomLLM
from litellm.types.utils import GenericStreamingChunk

class MyCustomLLM(CustomLLM):
    def completion(self, messages, *args, **kwargs) -> litellm.ModelResponse:
        return litellm.completion(
            model="claude-3-5-sonnet-20240620",
            messages=messages,
            # *args,
            # **kwargs
        )  # type: ignore

    def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
        raise Exception(status_code=500, message="Not implemented yet!")

    async def acompletion(self, messages, *args, **kwargs) -> litellm.ModelResponse:
        return await litellm.acompletion(
            model="claude-3-5-sonnet-20240620",
            messages=messages,
            # *args,
            # **kwargs
        )  # type: ignore

    async def astreaming(self, messages, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
        response = await litellm.acompletion(
            model="claude-3-5-sonnet-20240620",
            messages=messages,
            stream=True,
            # *args,
            # **kwargs
        )

        # This doesn't work, it's the wrong type
        # yield response

        # this also doesn't work
        async for chunk in response:
            yield chunk.choices

mycustomllm= MyCustomLLM()

How can i correctly implement that?

Relevant log output

14:55:52 - LiteLLM Proxy:ERROR: proxy_server.py:2421 - litellm.proxy.proxy_server.async_data_generator(): Exception occured - litellm.APIConnectionError: list indices must be integers or slices, not str
Traceback (most recent call last):
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\utils.py", line 9397, in chunk_creator
    completion_obj["content"] = anthropic_response_obj["text"]
                                ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^
TypeError: list indices must be integers or slices, not str

Traceback (most recent call last):
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\utils.py", line 9397, in chunk_creator
    completion_obj["content"] = anthropic_response_obj["text"]
                                ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^
TypeError: list indices must be integers or slices, not str

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\proxy\proxy_server.py", line 2400, in async_data_generator
    async for chunk in response:
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\utils.py", line 10483, in __anext__
    raise exception_type(
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\utils.py", line 10294, in __anext__
    processed_chunk: Optional[ModelResponse] = self.chunk_creator(
                                               ^^^^^^^^^^^^^^^^^^^
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\utils.py", line 10056, in chunk_creator
    raise exception_type(
          ^^^^^^^^^^^^^^^
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\utils.py", line 8168, in exception_type
    raise e
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\utils.py", line 8141, in exception_type
    raise APIConnectionError(
litellm.exceptions.APIConnectionError: litellm.APIConnectionError: list indices must be integers or slices, not str
Traceback (most recent call last):
  File "C:\Nextcloud\Projekte\MyBot\venv\Lib\site-packages\litellm\utils.py", line 9397, in chunk_creator
    completion_obj["content"] = anthropic_response_obj["text"]
                                ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^
TypeError: list indices must be integers or slices, not str

Twitter / LinkedIn details

No response

krrishdholakia commented 1 month ago

@Netzvamp use this to intercept calls - https://docs.litellm.ai/docs/proxy/call_hooks

Your error is caused because the chunk yielded by you is not in the expected format of a GenericStreamingChunk - https://github.com/BerriAI/litellm/blob/dd2ea72cb4f6106fc32cc7a56a6aa716ee14020e/litellm/types/utils.py#L82

krrishdholakia commented 1 month ago

Happy to setup a direct support channel to help further,

Netzvamp commented 1 month ago

Thank you, call hooks are great, i didn't know they existed!

stronk7 commented 1 month ago

Coincidentally, today I was trying the custom LLMs stuff and faced exactly this problem. For the records, I got this simple case (return unix epoch seconds) working, both completions and streaming.

Note that I don't know how correct that is (in fact I looked to tests to see how it was being done there). All I know is that it works and, as far as the documentation isn't showing any clear example with streaming, here it is:

import time
from typing import Iterator, AsyncIterator
from litellm.types.utils import GenericStreamingChunk, ModelResponse
from litellm import CustomLLM, completion, acompletion

class UnixTimeLLM(CustomLLM):
    def completion(self, *args, **kwargs) -> ModelResponse:
        return completion(
            model="test/unixtime",
            mock_response=str(int(time.time())),
        )  # type: ignore

    async def acompletion(self, *args, **kwargs) -> ModelResponse:
        return await acompletion(
            model="test/unixtime",
            mock_response=str(int(time.time())),
        )  # type: ignore

    def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]:
        generic_streaming_chunk: GenericStreamingChunk = {
            "finish_reason": "stop",
            "index": 0,
            "is_finished": True,
            "text": str(int(time.time())),
            "tool_use": None,
            "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
        }
        return generic_streaming_chunk # type: ignore

    async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]:
        generic_streaming_chunk: GenericStreamingChunk = {
            "finish_reason": "stop",
            "index": 0,
            "is_finished": True,
            "text": str(int(time.time())),
            "tool_use": None,
            "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0},
        }
        yield generic_streaming_chunk # type: ignore

unixtime = UnixTimeLLM()

Ciao :-)

krrishdholakia commented 1 month ago

Thank you, call hooks are great, i didn't know they existed!

Hey @Netzvamp where in our docs would this have been helpful to see?

Netzvamp commented 1 month ago

There is already a link on this site, so all fine, i'm blind ;) https://docs.litellm.ai/docs/providers/custom_llm_server

krrishdholakia commented 3 weeks ago

@stronk7 thanks for the code snippet. Added your example to docs + gave you a shoutout - https://docs.litellm.ai/docs/providers/custom_llm_server#add-streaming-support