run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
35.42k stars 4.99k forks source link

[Question]: llamaindex bedrock, event_type=<CBEventType.LLM: 'llm'> payload={'exception': NotImplementedError('Use async-for instead')} #15666

Open gilangdipper opened 2 weeks ago

gilangdipper commented 2 weeks ago

Question Validation

Question

Hi, I'm using llama-index-bedrock-converse@0.2.2, on CBEventType.LLM, always got NotImplementedError('Use async-for instead, I'm not sure the issue coming from where, it could be from here

event_handler = EventCallbackHandler()
    chat_engine.callback_manager.handlers.append(event_handler)  # type: ignore
    response = await chat_engine.astream_chat(last_message_content, messages)

    async def content_generator():
        # Yield the text response
        async def _text_generator():
            async for token in response.async_response_gen():
                yield VercelStreamResponse.convert_text(token)
            # the text_generator is the leading stream, once it's finished, also finish the event stream
            event_handler.is_done = True

        # Yield the events from the event handler
        async def _event_generator():
            async for event in event_handler.async_event_gen():
                event_response = event.to_response()
                if event_response is not None:
                    yield VercelStreamResponse.convert_data(event_response)

        combine = stream.merge(_text_generator(), _event_generator())

could you help me to point our where the configuration that I'm missing, stuck with this issue. Thank you

gilangdipper commented 2 weeks ago
image
gilangdipper commented 2 weeks ago

my bedrock's configuration:

def init_bedrock():
    from llama_index.llms.bedrock_converse import BedrockConverse
    from llama_index.embeddings.huggingface import HuggingFaceEmbedding

    model_map: Dict[str, str] = {"llama3-70b": "meta.llama3-70b-instruct-v1:0"}

    Settings.llm = BedrockConverse(model=model_map[os.getenv("MODEL")])

    embed_model_map: Dict[str, str] = {
        "all-MiniLM-L6-v2": "sentence-transformers/all-MiniLM-L6-v2",
        "all-mpnet-base-v2": "sentence-transformers/all-mpnet-base-v2",
    }
    Settings.embed_model = HuggingFaceEmbedding(
        model_name=embed_model_map[os.getenv("EMBEDDING_MODEL")]
    )
dosubot[bot] commented 2 weeks ago

Hey there, @gilangdipper! I'm here to help you with any bugs, questions, or contributions you have. Just waiting for a human maintainer? I'm here to assist you in the meantime.

To resolve the NotImplementedError('Use async-for instead') in llama-index-bedrock-converse@0.2.2 when using CBEventType.LLM, you need to ensure that the event handling is done asynchronously. The provided code does not currently support asynchronous event handling, which is likely causing the error.

Here is an updated version of the SimpleLLMHandler class with asynchronous methods to handle events:

import logging
from typing import Any, Dict, List, Optional, cast

from llama_index.core.callbacks.pythonically_printing_base_handler import (
    PythonicallyPrintingBaseHandler,
)
from llama_index.core.callbacks.schema import CBEventType, EventPayload

class SimpleLLMHandler(PythonicallyPrintingBaseHandler):
    """Callback handler for printing llms inputs/outputs."""

    def __init__(self, logger: Optional[logging.Logger] = None) -> None:
        super().__init__(
            event_starts_to_ignore=[], event_ends_to_ignore=[], logger=logger
        )

    async def start_trace(self, trace_id: Optional[str] = None) -> None:
        return

    async def end_trace(
        self,
        trace_id: Optional[str] = None,
        trace_map: Optional[Dict[str, List[str]]] = None,
    ) -> None:
        return

    async def _print_llm_event(self, payload: dict) -> None:
        from llama_index.core.llms import ChatMessage

        if EventPayload.PROMPT in payload:
            prompt = str(payload.get(EventPayload.PROMPT))
            completion = str(payload.get(EventPayload.COMPLETION))

            self._print(f"** Prompt: **\n{prompt}")
            self._print("*" * 50)
            self._print(f"** Completion: **\n{completion}")
            self._print("*" * 50)
            self._print("\n")
        elif EventPayload.MESSAGES in payload:
            messages = cast(List[ChatMessage], payload.get(EventPayload.MESSAGES, []))
            messages_str = "\n".join([str(x) for x in messages])
            response = str(payload.get(EventPayload.RESPONSE))

            self._print(f"** Messages: **\n{messages_str}")
            self._print("*" * 50)
            self._print(f"** Response: **\n{response}")
            self._print("*" * 50)
            self._print("\n")

    async def on_event_start(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        parent_id: str = "",
        **kwargs: Any,
    ) -> str:
        return event_id

    async def on_event_end(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> None:
        """Count the LLM or Embedding tokens as needed."""
        if event_type == CBEventType.LLM and payload is not None:
            await self._print_llm_event(payload)

This code updates the SimpleLLMHandler class to use asynchronous methods (async def) for handling events, which should resolve the NotImplementedError('Use async-for instead') error. Additionally, ensure that your LLM supports async operations before implementing them [1][2][3][4].

To continue talking to Dosu, mention @dosu.

gilangdipper commented 2 weeks ago

hi @dosu , could you help to check my code:

import json
import asyncio
from typing import AsyncGenerator, Dict, Any, List, Optional
from llama_index.core.callbacks.base import BaseCallbackHandler
from llama_index.core.callbacks.schema import CBEventType
from llama_index.core.tools.types import ToolOutput
from pydantic import BaseModel

class CallbackEvent(BaseModel):
    event_type: CBEventType
    payload: Optional[Dict[str, Any]] = None
    event_id: str = ""

    def get_retrieval_message(self) -> dict | None:
        if self.payload:
            nodes = self.payload.get("nodes")
            if nodes:
                msg = f"Retrieved {len(nodes)} sources to use as context for the query"
            else:
                msg = f"Retrieving context for query: '{self.payload.get('query_str')}'"
            return {
                "type": "events",
                "data": {"title": msg},
            }
        else:
            return None

    def get_tool_message(self) -> dict | None:
        func_call_args = self.payload.get("function_call")
        if func_call_args is not None and "tool" in self.payload:
            tool = self.payload.get("tool")
            return {
                "type": "events",
                "data": {
                    "title": f"Calling tool: {tool.name} with inputs: {func_call_args}",
                },
            }

    def _is_output_serializable(self, output: Any) -> bool:
        try:
            json.dumps(output)
            return True
        except TypeError:
            return False

    def get_agent_tool_response(self) -> dict | None:
        response = self.payload.get("response")
        if response is not None:
            sources = response.sources
            for source in sources:
                # Return the tool response here to include the toolCall information
                if isinstance(source, ToolOutput):
                    if self._is_output_serializable(source.raw_output):
                        output = source.raw_output
                    else:
                        output = source.content

                    return {
                        "type": "tools",
                        "data": {
                            "toolOutput": {
                                "output": output,
                                "isError": source.is_error,
                            },
                            "toolCall": {
                                "id": None,  # There is no tool id in the ToolOutput
                                "name": source.tool_name,
                                "input": source.raw_input,
                            },
                        },
                    }

    def to_response(self):
        match self.event_type:
            case "retrieve":
                return self.get_retrieval_message()
            case "function_call":
                return self.get_tool_message()
            case "agent_step":
                return self.get_agent_tool_response()
            case _:
                return None

class EventCallbackHandler(BaseCallbackHandler):
    _aqueue: asyncio.Queue
    is_done: bool = False

    def __init__(
        self,
    ):
        """Initialize the base callback handler."""
        ignored_events = [
            # CBEventType.CHUNKING,
            # CBEventType.NODE_PARSING,
            # CBEventType.EMBEDDING,
            # CBEventType.LLM,
            # CBEventType.TEMPLATING,
        ]
        super().__init__(ignored_events, ignored_events)
        self._aqueue = asyncio.Queue()

    def on_event_start(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> str:
        event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
        if event.to_response() is not None:
            self._aqueue.put_nowait(event)

    def on_event_end(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> None:
        event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
        print(event)

        if event.to_response() is not None:
            self._aqueue.put_nowait(event)

    def start_trace(self, trace_id: Optional[str] = None) -> None:
        """No-op."""

    def end_trace(
        self,
        trace_id: Optional[str] = None,
        trace_map: Optional[Dict[str, List[str]]] = None,
    ) -> None:
        """No-op."""

    async def async_event_gen(self) -> AsyncGenerator[CallbackEvent, None]:
        while not self._aqueue.empty() or not self.is_done:
            try:
                yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1)
            except asyncio.TimeoutError:
                pass
dosubot[bot] commented 2 weeks ago

Your code for handling callback events in llama_index looks mostly correct, but there are a few improvements and potential issues to address:

  1. Initialization of is_done: Ensure is_done is properly initialized in the constructor.
  2. Handling of None payloads: Add checks to handle cases where payload might be None.
  3. Properly closing the generator: Ensure the generator in async_event_gen is properly closed when is_done is set to True.

Here is a revised version of your code with these improvements:

import json
import asyncio
from typing import AsyncGenerator, Dict, Any, List, Optional
from llama_index.core.callbacks.base import BaseCallbackHandler
from llama_index.core.callbacks.schema import CBEventType
from llama_index.core.tools.types import ToolOutput
from pydantic import BaseModel

class CallbackEvent(BaseModel):
    event_type: CBEventType
    payload: Optional[Dict[str, Any]] = None
    event_id: str = ""

    def get_retrieval_message(self) -> dict | None:
        if self.payload:
            nodes = self.payload.get("nodes")
            if nodes:
                msg = f"Retrieved {len(nodes)} sources to use as context for the query"
            else:
                msg = f"Retrieving context for query: '{self.payload.get('query_str')}'"
            return {
                "type": "events",
                "data": {"title": msg},
            }
        return None

    def get_tool_message(self) -> dict | None:
        if self.payload:
            func_call_args = self.payload.get("function_call")
            if func_call_args is not None and "tool" in self.payload:
                tool = self.payload.get("tool")
                return {
                    "type": "events",
                    "data": {
                        "title": f"Calling tool: {tool.name} with inputs: {func_call_args}",
                    },
                }
        return None

    def _is_output_serializable(self, output: Any) -> bool:
        try:
            json.dumps(output)
            return True
        except TypeError:
            return False

    def get_agent_tool_response(self) -> dict | None:
        if self.payload:
            response = self.payload.get("response")
            if response is not None:
                sources = response.sources
                for source in sources:
                    if isinstance(source, ToolOutput):
                        output = source.raw_output if self._is_output_serializable(source.raw_output) else source.content
                        return {
                            "type": "tools",
                            "data": {
                                "toolOutput": {
                                    "output": output,
                                    "isError": source.is_error,
                                },
                                "toolCall": {
                                    "id": None,
                                    "name": source.tool_name,
                                    "input": source.raw_input,
                                },
                            },
                        }
        return None

    def to_response(self):
        match self.event_type:
            case "retrieve":
                return self.get_retrieval_message()
            case "function_call":
                return self.get_tool_message()
            case "agent_step":
                return self.get_agent_tool_response()
            case _:
                return None

class EventCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        """Initialize the base callback handler."""
        ignored_events = [
            # CBEventType.CHUNKING,
            # CBEventType.NODE_PARSING,
            # CBEventType.EMBEDDING,
            # CBEventType.LLM,
            # CBEventType.TEMPLATING,
        ]
        super().__init__(ignored_events, ignored_events)
        self._aqueue = asyncio.Queue()
        self.is_done = False

    def on_event_start(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> str:
        event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
        if event.to_response() is not None:
            self._aqueue.put_nowait(event)
        return event_id

    def on_event_end(
        self,
        event_type: CBEventType,
        payload: Optional[Dict[str, Any]] = None,
        event_id: str = "",
        **kwargs: Any,
    ) -> None:
        event = CallbackEvent(event_id=event_id, event_type=event_type, payload=payload)
        print(event)
        if event.to_response() is not None:
            self._aqueue.put_nowait(event)

    def start_trace(self, trace_id: Optional[str] = None) -> None:
        """No-op."""

    def end_trace(
        self,
        trace_id: Optional[str] = None,
        trace_map: Optional[Dict[str, List[str]]] = None,
    ) -> None:
        """No-op."""

    async def async_event_gen(self) -> AsyncGenerator[CallbackEvent, None]:
        while not self._aqueue.empty() or not self.is_done:
            try:
                yield await asyncio.wait_for(self._aqueue.get(), timeout=0.1)
            except asyncio.TimeoutError:
                pass

This should address the issues related to initialization, handling None payloads, and properly closing the generator.

To continue talking to Dosu, mention @dosu.

gilangdipper commented 2 weeks ago

@dosu, is there somethings we can improve related this issue using llama-index-bedrock-converse@0.2.2, on CBEventType.LLM, always got NotImplementedError('Use async-for instead, I'm not sure the issue coming from where, it could be from here

gilangdipper commented 2 weeks ago

llamaindex bedrock, event_type=<CBEventType.LLM: 'llm'> payload={'exception': NotImplementedError('Use async-for instead')}