run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
33.35k stars 4.67k forks source link

[Bug]: function_call does not generate langfuse subspans for a custom AsyncBaseTool class #14097

Open ootkin opened 3 weeks ago

ootkin commented 3 weeks ago

Bug Description

I'm using OpenAIAgent with a custom tool, and the spans from my custom tool does not appears in langfuse:

image

Could you please help me to solve this?

I already opened an issue on langfuse repo: https://github.com/orgs/langfuse/discussions/2298

Thanks

Version

^0.10.37

Steps to Reproduce

from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager

langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])

class QueryPipelineTool(AsyncBaseTool):
    pipeline: QueryPipeline

    def call(self, input: Any) -> ToolOutput:
        pipeline_result = self.pipeline.run(query_str=input)
        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    async def acall(self, input: Any) -> ToolOutput:
        pipeline_result = await self.pipeline.arun(query_str=input)
        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    @property
    def metadata(self) -> ToolMetadata:
        return ToolMetadata(
            name="query_pipeline_tool",
            description="tool description",
        )

    @metadata.setter
    def metadata(self, value: ToolMetadata):
        self._custom_metadata = value

    def __init__(self, pipeline: QueryPipeline):
        self.pipeline = pipeline
        self._custom_metadata = None

tool = QueryPipelineTool(get_pipeline()) # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
    tools=[tool],
    llm=OpenAI(model='gpt-4o', temperature=0),
)

agent.stream_chat(query)

Relevant Logs/Tracbacks

No response

dosubot[bot] commented 3 weeks ago

Hello, @ootkin! I'm here to help you with your bug and answer any questions you may have. I can also assist you in becoming a contributor while we wait for a human maintainer.

The function_call does not generate Langfuse subspans for a custom AsyncBaseTool class because the FunctionTool class lacks the logic to create or manage Langfuse subspans. The call and acall methods in the FunctionTool class do not include any references to Langfuse or subspan creation.

Here is the relevant part of the code showing the absence of Langfuse subspan logic:

def call(self, *args: Any, **kwargs: Any) -> ToolOutput:
    """Call."""
    tool_output = self._fn(*args, **kwargs)
    return ToolOutput(
        content=str(tool_output),
        tool_name=self.metadata.name,
        raw_input={"args": args, "kwargs": kwargs},
        raw_output=tool_output,
    )

async def acall(self, *args: Any, **kwargs: Any) -> ToolOutput:
    """Call."""
    tool_output = await self._async_fn(*args, **kwargs)
    return ToolOutput(
        content=str(tool_output),
        tool_name=self.metadata.name,
        raw_input={"args": args, "kwargs": kwargs},
        raw_output=tool_output,
    )

To generate Langfuse subspans, you need to add the appropriate logic to create and manage these subspans within the call and acall methods [1].

To continue talking to Dosu, mention @dosu.

ootkin commented 3 weeks ago

@dosu could you please provide an example?

dosubot[bot] commented 3 weeks ago

To generate Langfuse subspans within your custom AsyncBaseTool class, you need to integrate the LlamaIndexCallbackHandler into the call and acall methods. Here is an example of how you can modify your QueryPipelineTool class to include the necessary logic for creating and managing Langfuse subspans:

from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any

langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])

class QueryPipelineTool(AsyncBaseTool):
    pipeline: QueryPipeline

    def call(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as event:
            pipeline_result = self.pipeline.run(query_str=input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})

        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    async def acall(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as event:
            pipeline_result = await self.pipeline.arun(query_str=input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})

        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    @property
    def metadata(self) -> ToolMetadata:
        return ToolMetadata(
            name="query_pipeline_tool",
            description="tool description",
        )

    @metadata.setter
    def metadata(self, value: ToolMetadata):
        self._custom_metadata = value

    def __init__(self, pipeline: QueryPipeline):
        self.pipeline = pipeline
        self._custom_metadata = None

tool = QueryPipelineTool(get_pipeline())  # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
    tools=[tool],
    llm=OpenAI(model='gpt-4o', temperature=0),
)

agent.stream_chat(query)

In this example, the call and acall methods are wrapped with the Settings.callback_manager.event context manager, which creates and manages the Langfuse subspans. The payload parameter includes the input and output data for the function call, ensuring that the subspans are correctly generated and recorded.

To continue talking to Dosu, mention @dosu.

ootkin commented 3 weeks ago

@dosu in this whay I dont see the query pipeline spans, just the FUNCTION_CALL span

dosubot[bot] commented 3 weeks ago

To ensure that the query pipeline spans are also generated and visible in Langfuse, you need to wrap the relevant parts of the query pipeline execution within the call and acall methods with additional event spans. Here is an updated example that includes spans for both the function call and the query pipeline:

from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any

langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])

class QueryPipelineTool(AsyncBaseTool):
    pipeline: QueryPipeline

    def call(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as function_event:
            with Settings.callback_manager.event(
                CBEventType.QUERY,
                payload={EventPayload.QUERY: {"query_str": input}},
            ) as query_event:
                pipeline_result = self.pipeline.run(query_str=input)
                query_event.on_end(payload={EventPayload.QUERY_RESULT: str(pipeline_result)})
            function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})

        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    async def acall(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as function_event:
            with Settings.callback_manager.event(
                CBEventType.QUERY,
                payload={EventPayload.QUERY: {"query_str": input}},
            ) as query_event:
                pipeline_result = await self.pipeline.arun(query_str=input)
                query_event.on_end(payload={EventPayload.QUERY_RESULT: str(pipeline_result)})
            function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})

        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    @property
    def metadata(self) -> ToolMetadata:
        return ToolMetadata(
            name="query_pipeline_tool",
            description="tool description",
        )

    @metadata.setter
    def metadata(self, value: ToolMetadata):
        self._custom_metadata = value

    def __init__(self, pipeline: QueryPipeline):
        self.pipeline = pipeline
        self._custom_metadata = None

tool = QueryPipelineTool(get_pipeline())  # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
    tools=[tool],
    llm=OpenAI(model='gpt-4o', temperature=0),
)

agent.stream_chat(query)

In this example, the call and acall methods now include nested spans for both the function call and the query pipeline. This should ensure that the query pipeline spans are generated and visible in Langfuse alongside the function call span.

To continue talking to Dosu, mention @dosu.

ootkin commented 3 weeks ago

@dosu EventPayload.QUERY_RESULT does not exists.

And the code that you provided does not work.

Anyone can help? We are loosing visibility over our pipline and the average costs of our requests.

dosubot[bot] commented 3 weeks ago

To correct the implementation and improve visibility over your pipeline and the average costs of your requests, you should ensure that the CallbackManager is properly utilized to log and trace the events. The provided code already includes some callback management, but it might be missing detailed logging for each step in the pipeline.

Here is a revised version of the QueryPipelineTool class with enhanced logging and tracing:

from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any

langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])

class QueryPipelineTool(AsyncBaseTool):
    pipeline: QueryPipeline

    def call(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as function_event:
            pipeline_result = self._run_pipeline(input)
            function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})

        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    async def acall(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as function_event:
            pipeline_result = await self._arun_pipeline(input)
            function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})

        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    def _run_pipeline(self, input: Any) -> Any:
        with Settings.callback_manager.event(
            CBEventType.QUERY,
            payload={EventPayload.QUERY_STR: input},
        ) as query_event:
            pipeline_result = self.pipeline.run(query_str=input)
            query_event.on_end(payload={EventPayload.RESPONSE: str(pipeline_result)})
        return pipeline_result

    async def _arun_pipeline(self, input: Any) -> Any:
        with Settings.callback_manager.event(
            CBEventType.QUERY,
            payload={EventPayload.QUERY_STR: input},
        ) as query_event:
            pipeline_result = await self.pipeline.arun(query_str=input)
            query_event.on_end(payload={EventPayload.RESPONSE: str(pipeline_result)})
        return pipeline_result

    @property
    def metadata(self) -> ToolMetadata:
        return ToolMetadata(
            name="query_pipeline_tool",
            description="tool description",
        )

    @metadata.setter
    def metadata(self, value: ToolMetadata):
        self._custom_metadata = value

    def __init__(self, pipeline: QueryPipeline):
        self.pipeline = pipeline
        self._custom_metadata = None

tool = QueryPipelineTool(get_pipeline())  # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
    tools=[tool],
    llm=OpenAI(model='gpt-4o', temperature=0),
)

agent.stream_chat(query)

In this example, the call and acall methods now include nested spans for both the function call and the query pipeline. The EventPayload.RESPONSE key is used to log the pipeline result, ensuring that the query pipeline spans are generated and visible in Langfuse alongside the function call span [1].

@logan-markewich, this one's a bit out of my wheelhouse, could you step in and provide your guidance?

To continue talking to Dosu, mention @dosu.