crewAIInc / crewAI

Framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks.
https://crewai.com
MIT License
18.68k stars 2.57k forks source link

Integration with Langfuse? #515

Closed gabrielfior closed 5 days ago

gabrielfior commented 4 months ago

How can I pass a Langfuse callback handler to CrewAI so that the traces are available on their web UI? Here's what I have so far

from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
            model=self.model,
            api_key=keys.openai_api_key.get_secret_value(),
            temperature=0.0,
            callbacks=[LangfuseCallbackHandler()],
        )
# this produces traces on Langfuse Web UI - similarly, `llm.predict`, `chain.run` also do
llm.invoke("What are some theories about the relationship between unemployment and inflation?" )

# this does not produce traces
predictor = Agent(
            role="Professional Gambler",
            goal="XXX",
            backstory="XXX",
            llm=llm,
        )

task_final_decision = Task(
            description=MY_PROMPT,
            agent=predictor,
        )

 crew = Crew(
            agents=[predictor],
            tasks=[task_final_decision],
            verbose=2,
        )

crew.kickoff()

Any guidance around this integration much appreciated.

cracksauce commented 3 months ago

Commenting in support, would be a huge fan of this integration

andydataguy commented 3 months ago

Would be stoked to see direct langufse integration

sushe-shakya commented 3 months ago

I have found a workaround for this. Just add the langfuse callback handler in the agent_executor invoke command by overriding the execute_task method of crewai Agent class. Use this CustomizedCrewAgent class to create your agents.

from langfuse.callback import CallbackHandler
from typing import Any, Optional, List
from langchain.tools.render import render_text_description
from crewai import Agent
from crewai.memory.contextual.contextual_memory import ContextualMemory

langfuse_callback_handler = CallbackHandler(secret_key="your_secret_key", 
                                            public_key="your_public_key", 
                                            host="your_host")

class CustomizedCrewAgent(Agent):

    @staticmethod
    def __tools_names(tools) -> str:
        return ", ".join([t.name for t in tools])

    def execute_task(
        self,
        task: Any,
        context: Optional[str] = None,
        tools: Optional[List[Any]] = None,
    ) -> str:
        """Execute a task with the agent.

        Args:
            task: Task to execute.
            context: Context to execute the task in.
            tools: Tools to use for the task.

        Returns:
            Output of the agent
        """
        if self.tools_handler:
            self.tools_handler.last_used_tool = {}  # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")

        task_prompt = task.prompt()

        if context:
            task_prompt = self.i18n.slice("task_with_context").format(
                task=task_prompt, context=context
            )

        if self.crew and self.crew.memory:
            contextual_memory = ContextualMemory(
                self.crew._short_term_memory,
                self.crew._long_term_memory,
                self.crew._entity_memory,
            )
            memory = contextual_memory.build_context_for_task(task, context)
            if memory.strip() != "":
                task_prompt += self.i18n.slice("memory").format(memory=memory)

        tools = tools or self.tools
        parsed_tools = self._parse_tools(tools)  # type: ignore # Argument 1 to "_parse_tools" of "Agent" has incompatible type "list[Any] | None"; expected "list[Any]"

        self.create_agent_executor(tools=tools)
        self.agent_executor.tools = parsed_tools
        self.agent_executor.task = task

        self.agent_executor.tools_description = render_text_description(parsed_tools)
        self.agent_executor.tools_names = self.__tools_names(parsed_tools)

        result = self.agent_executor.invoke(
            {
                "input": task_prompt,
                "tool_names": self.agent_executor.tools_names,
                "tools": self.agent_executor.tools_description,
            },
            config={"callbacks": [langfuse_callback_handler]},

        )["output"]

        if self.max_rpm:
            self._rpm_controller.stop_rpm_counter()

        return result
sbhadana commented 2 months ago

any progress on this integration?

cheepon commented 2 months ago

any plan?

leslyarun commented 2 months ago

This feature is absolutely essential, @joaomdmoura

xuanchenj commented 1 month ago

In the agent object of crewAI, there is an attribute called callbacks, where you can pass the callback handler to this attribute.

from langfuse.callback import CallbackHandler
Agent(
    role='xxx',
    goal='xxxxx',
   backstory=f"""xxxxx""",
    tools=[],
    llm=llm_model,
    callbacks=[CallbackHandler()]
)
kongzii commented 2 weeks ago

Initialising Agent with Langfuse callbacks is better than nothing:

Agent(
  ...
  callbacks=[langfuse_context.get_current_langchain_handler()],
)

Screenshot by Dropbox Capture

However, it's not as useful as if the callback is added to the invoke method of Langchain:

config: RunnableConfig = {
        "callbacks": [langfuse_context.get_current_langchain_handler()]
    }
    content = str(
        llm.invoke(
            f"Extract the event date in the format `%m-%d-%Y` from the following question, don't write anything else, only the event date in the given format: `{question}`",
            config=config,
        ).content
    ).strip("'`\"")

Screenshot by Dropbox Capture