run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.42k stars 4.99k forks source link

[Question]: [BedrockConverse] astream handler for non async iterable #15704

Open gilang-superbank opened 2 weeks ago

gilang-superbank commented 2 weeks ago

Question Validation

Question

HI I'm using BedrockConverse to handle stream API for chat bot, for others LLM's astream_chat function they use async iterable but not for BedrockConverse's astream_chat .

could you help tell how to handle this astream_function for bedrock converse?

here's my response handler for bedrock:

    event_handler = EventCallbackHandler()
    chat_engine.callback_manager.handlers.append(event_handler)  # type: ignore
    response = await chat_engine.astream_chat(last_message_content, messages)

    async def content_generator():
        # Yield the text response
        async def _text_generator():
            async for token in response.async_response_gen():
                yield VercelStreamResponse.convert_text(token)
            # the text_generator is the leading stream, once it's finished, also finish the event stream
            event_handler.is_done = True

        # Yield the events from the event handler
        async def _event_generator():
            async for event in event_handler.async_event_gen():
                event_response = event.to_response()
                if event_response is not None:
                    yield VercelStreamResponse.convert_data(event_response)

        combine = stream.merge(_text_generator(), _event_generator())
gilang-superbank commented 2 weeks ago

image

dosubot[bot] commented 2 weeks ago

Hey there, @gilang-superbank! I'm here to help you with any bugs, questions, or contributions while you wait for a human maintainer. Let's tackle this together!

To handle the astream_function for BedrockConverse when it does not use an async iterable and to resolve the NotImplementedError with the message "Use async-for instead" in the llama_index/core/instrumentation/dispatcher.py file at line 275, you need to implement the asynchronous methods acomplete, astream_chat, and astream_complete in the Bedrock class.

Here is how you can implement these methods:

import asyncio

class Bedrock(LLM):
    # ... existing code ...

    async def acomplete(
        self, prompt: str, formatted: bool = False, **kwargs: Any
    ) -> CompletionResponse:
        if not formatted:
            prompt = self.completion_to_prompt(prompt)
        all_kwargs = self._get_all_kwargs(**kwargs)
        request_body = self._provider.get_request_body(prompt, all_kwargs)
        request_body_str = json.dumps(request_body)

        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(
            None,
            lambda: completion_with_retry(
                client=self._client,
                model=self.model,
                request_body=request_body_str,
                max_retries=self.max_retries,
                **all_kwargs,
            )["body"].read()
        )
        response = json.loads(response)
        return CompletionResponse(
            text=self._provider.get_text_from_response(response), raw=response
        )

    async def astream_complete(
        self, prompt: str, formatted: bool = False, **kwargs: Any
    ) -> CompletionResponseAsyncGen:
        if self.model in BEDROCK_FOUNDATION_LLMS and self.model not in STREAMING_MODELS:
            raise ValueError(f"Model {self.model} does not support streaming")

        if not formatted:
            prompt = self.completion_to_prompt(prompt)

        all_kwargs = self._get_all_kwargs(**kwargs)
        request_body = self._provider.get_request_body(prompt, all_kwargs)
        request_body_str = json.dumps(request_body)

        loop = asyncio.get_event_loop()
        response = await loop.run_in_executor(
            None,
            lambda: completion_with_retry(
                client=self._client,
                model=self.model,
                request_body=request_body_str,
                max_retries=self.max_retries,
                stream=True,
                **all_kwargs,
            )["body"]
        )

        async def gen() -> CompletionResponseAsyncGen:
            content = ""
            for r in response:
                r = json.loads(r["chunk"]["bytes"])
                content_delta = self._provider.get_text_from_stream_response(r)
                content += content_delta
                yield CompletionResponse(text=content, delta=content_delta, raw=r)

        return gen()

    async def astream_chat(
        self, messages: Sequence[ChatMessage], **kwargs: Any
    ) -> ChatResponseAsyncGen:
        prompt = self.messages_to_prompt(messages)
        completion_response = await self.astream_complete(prompt, formatted=True, **kwargs)
        return stream_completion_response_to_chat_response(completion_response)

This implementation uses asyncio to run the synchronous completion_with_retry function in an executor, allowing it to be used in an asynchronous context. This should resolve the NotImplementedError and allow you to handle the astream_function for BedrockConverse [1].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 2 weeks ago

It's a bug in the llm, it's not using an async for loop

logan-markewich commented 2 weeks ago

@gilang-superbank actually, I think this was just fixed, try pip install -U llama-index-llms-bedrock-converse

gilang-superbank commented 2 weeks ago

hi @logan-markewich , it solved the issue, but now another issue came up, the event of LLM never show up, stream is always open, could you help to point out which part I missed out?

here's my response handler's function:

@r.post("/")
async def chat(
    request: Request,
    data: _ChatData,
    chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
    last_message_content, messages = await parse_chat_data(data)
    event_handler = EventCallbackHandler()
    chat_engine.callback_manager.handlers.append(event_handler)  # type: ignore
    response = await chat_engine.astream_chat(last_message_content, messages)

    async def content_generator():
        # Yield the text response
        async def _text_generator():
            async for token in response.async_response_gen():
                yield VercelStreamResponse.convert_text(token)
            # the text_generator is the leading stream, once it's finished, also finish the event stream
            event_handler.is_done = True

        # Yield the events from the event handler
        async def _event_generator():
            async for event in event_handler.async_event_gen():
                event_response = event.to_response()
                if event_response is not None:
                    yield VercelStreamResponse.convert_data(event_response)

        combine = stream.merge(_text_generator(), _event_generator())
        async with combine.stream() as streamer:
            async for item in streamer:
                if await request.is_disconnected():
                    break
                yield item

        # Yield the source nodes
        yield VercelStreamResponse.convert_data(
            {
                "type": "sources",
                "data": {
                    "nodes": [
                        _SourceNodes.from_source_node(node).dict()
                        for node in response.source_nodes
                    ]
                },
            }
        )

    return VercelStreamResponse(content=content_generator())
gilang-superbank commented 2 weeks ago

it stuck on CBEventType.RETRIEVE

image
gilang-superbank commented 2 weeks ago

@dosu, could you help me for this ? thanks

gilang-superbank commented 2 weeks ago

it seems async_response_gen is not returning the token, any idea @logan-markewich ? is there something I missed?

logan-markewich commented 2 weeks ago

@gilang-superbank I think there is a bug in your code, astream_chat() only takes a single list of messages

I think your code should be

response = await chat_engine.astream_chat(messages)

Where messages is a list of ALL input ChatMessage objects

logan-markewich commented 2 weeks ago

Probably by passing in two arguments, its getting a little messed up and causing some traceback somewhere under the hood thats getting swallowed somehow

gilang-superbank commented 2 weeks ago

hi @logan-markewich , it still doesnt work, got this error now:

chatbot-be-llama        | ERROR:    Exception in ASGI application
chatbot-be-llama        | Traceback (most recent call last):
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi
chatbot-be-llama        |     result = await app(  # type: ignore[func-returns-value]
chatbot-be-llama        |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
chatbot-be-llama        |     return await self.app(scope, receive, send)
chatbot-be-llama        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
chatbot-be-llama        |     await super().__call__(scope, receive, send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
chatbot-be-llama        |     await self.middleware_stack(scope, receive, send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
chatbot-be-llama        |     raise exc
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
chatbot-be-llama        |     await self.app(scope, receive, _send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 91, in __call__
chatbot-be-llama        |     await self.simple_response(scope, receive, send, request_headers=headers)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 146, in simple_response
chatbot-be-llama        |     await self.app(scope, receive, send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
chatbot-be-llama        |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
chatbot-be-llama        |     raise exc
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
chatbot-be-llama        |     await app(scope, receive, sender)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 758, in __call__
chatbot-be-llama        |     await self.middleware_stack(scope, receive, send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 778, in app
chatbot-be-llama        |     await route.handle(scope, receive, send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 299, in handle
chatbot-be-llama        |     await self.app(scope, receive, send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 79, in app
chatbot-be-llama        |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
chatbot-be-llama        |     raise exc
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
chatbot-be-llama        |     await app(scope, receive, sender)
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 74, in app
chatbot-be-llama        |     response = await func(request)
chatbot-be-llama        |                ^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 299, in app
chatbot-be-llama        |     raise e
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 294, in app
chatbot-be-llama        |     raw_response = await run_endpoint_function(
chatbot-be-llama        |                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/fastapi/routing.py", line 191, in run_endpoint_function
chatbot-be-llama        |     return await dependant.call(**values)
chatbot-be-llama        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/app/app/api/routers/chat.py", line 201, in chat
chatbot-be-llama        |     response = await chat_engine.astream_chat(messages)
chatbot-be-llama        |                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 291, in async_wrapper
chatbot-be-llama        |     result = await func(*args, **kwargs)
chatbot-be-llama        |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/llama_index/core/callbacks/utils.py", line 56, in async_wrapper
chatbot-be-llama        |     return await func(self, *args, **kwargs)
chatbot-be-llama        |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/llama_index/core/chat_engine/condense_plus_context.py", line 343, in astream_chat
chatbot-be-llama        |     chat_messages, context_source, context_nodes = await self._arun_c3(
chatbot-be-llama        |                                                    ^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/llama_index/core/chat_engine/condense_plus_context.py", line 247, in _arun_c3
chatbot-be-llama        |     context_str, context_nodes = await self._aretrieve_context(condensed_question)
chatbot-be-llama        |                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/llama_index/core/chat_engine/condense_plus_context.py", line 174, in _aretrieve_context
chatbot-be-llama        |     nodes = await self._retriever.aretrieve(message)
chatbot-be-llama        |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/llama_index/core/instrumentation/dispatcher.py", line 291, in async_wrapper
chatbot-be-llama        |     result = await func(*args, **kwargs)
chatbot-be-llama        |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/llama_index/core/base/base_retriever.py", line 263, in aretrieve
chatbot-be-llama        |     RetrievalStartEvent(
chatbot-be-llama        |   File "/usr/local/lib/python3.11/site-packages/pydantic/main.py", line 193, in __init__
chatbot-be-llama        |     self.__pydantic_validator__.validate_python(data, self_instance=self)
chatbot-be-llama        | pydantic_core._pydantic_core.ValidationError: 2 validation errors for RetrievalStartEvent
chatbot-be-llama        | str_or_query_bundle.str
chatbot-be-llama        |   Input should be a valid string [type=string_type, input_value=[], input_type=list]
chatbot-be-llama        |     For further information visit https://errors.pydantic.dev/2.8/v/string_type
chatbot-be-llama        | str_or_query_bundle.QueryBundle
chatbot-be-llama        |   Input should be a dictionary or an instance of QueryBundle [type=dataclass_type, input_value=[], input_type=list]
chatbot-be-llama        |     For further information visit https://errors.pydantic.dev/2.8/v/dataclass_type
gilang-superbank commented 2 weeks ago

hi @logan-markewich , I tried using stream_chat instead, and it worked on my local setup. Could you tell me the pros and cons of using it, aside from the synchronous processing? Thank you :pray:

dsaks9 commented 2 days ago

Hey all, I am running into the same issue, stream not returning any tokens.

    async def response_generator() -> AsyncGenerator:
        response_stream = await self.llm.astream_chat_with_tools(
            self.tools,
            chat_history=chat_history
        )

        full_response = None
        yielded_indicator = False
        async for chunk in response_stream:
            if 'tool_calls' not in chunk.message.additional_kwargs:
                # Yield a boolean to indicate whether the response is a tool call
                if not yielded_indicator:
                    yield False
                    yielded_indicator = True

                # if not a tool call, yield the chunks!
                yield chunk
            elif not yielded_indicator:
                # Yield the indicator for a tool call
                yield True
                yielded_indicator = True

            full_response = chunk

        # Write the full response to memory
        self.memory.put(full_response.message)

        # Yield the final response
        yield full_response

    # Start the generator
    generator = response_generator()

    # Check for immediate tool call
    is_tool_call = await generator.__anext__()
    if is_tool_call:
        full_response = await generator.__anext__()
        tool_calls = self.llm.get_tool_calls_from_response(full_response)
        return ToolCallEvent(tool_calls=tool_calls)

    # If we've reached here, it's not an immediate tool call, so we return the generator
    return StopEvent(result=generator)`

Agent call ret = await agent.run(input="Hello!") async for token in ret: print(token.delta, end="", flush=True)