langchain-ai / langserve

LangServe 🦜️🏓
Other
1.9k stars 211 forks source link

Need help to migrate my custom agent to LCEL to use with langserve #220

Open pokidyshev opened 10 months ago

pokidyshev commented 10 months ago

Hi, team-langchain,

I have an agent that uses memory, user-authentication as well as function calling. I'd like to migrate it to langserve in production but couldn't find anything as complex as my case in the examples in docs. So I got stuck and need help. Could you please give me an advise how to convert this code to LCEL?

agent.py:

from typing import Type

from langchain.agents import AgentExecutor, OpenAIFunctionsAgent
from langchain.agents.openai_functions_agent.agent_token_buffer_memory import (
    AgentTokenBufferMemory,
)
from langchain.callbacks.base import Callbacks
from langchain.chat_models import AzureChatOpenAI
from langchain.prompts import MessagesPlaceholder
from langchain.prompts.chat import BaseMessagePromptTemplate
from langchain.schema import SystemMessage

from .src import CustomFirestoreChatMessageHistory, CustomOpenAIFunctionsTool

HUMAN_MESSAGE_TEMPLATE = "..."

class CRMAgent:
    tool_classes: list[Type[CustomOpenAIFunctionsTool]]
    system_message_template: str

    def __init__(self, api_wrapper, crm_user, internal_user):
        self.api_wrapper = api_wrapper
        self.crm_user = crm_user
        self.hints_user = internal_user

        self.llm = AzureChatOpenAI(...)

        chat_memory = CustomFirestoreChatMessageHistory(
            user_id=internal_user["user_id"], session_id=internal_user["integration_id"]
        )
        self.memory = AgentTokenBufferMemory(chat_memory=chat_memory, llm=self.llm)

        self.tools = [
            ToolClass.from_api_wrapper(self.api_wrapper, **self.crm_user)
            for ToolClass in self.tool_classes
        ]

        system_message = self.system_message_template.format(...)
        extra_prompt_messages: list[BaseMessagePromptTemplate] | None = [
            MessagesPlaceholder(variable_name=self.memory.memory_key)
        ]
        self.agent = OpenAIFunctionsAgent.from_llm_and_tools(
            llm=self.llm,
            tools=self.tools,
            extra_prompt_messages=extra_prompt_messages,
            system_message=SystemMessage(content=system_message),
        )

        self.executor = AgentExecutor.from_agent_and_tools(
            agent=self.agent,
            tools=self.tools,
            memory=self.memory,
            handle_parsing_errors=True,
            return_intermediate_steps=True,
            metadata=self.hints_user,
        )

    def run(self, message: str, callbacks: Callbacks) -> str:
        """Run the agent on a human message."""
        human_message = HUMAN_MESSAGE_TEMPLATE.format(
            timestamp=self.api_wrapper.get_current_timestamp(),
            message=message,
        )
        inputs = {"input": human_message}
        return self.executor(inputs, callbacks=callbacks)["output"]

app.py

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Credentials(BaseModel):
    access_token: str

class CRMInput(BaseModel):
    credentials: Credentials
    user_message: str
    pipeline_id: str
    user_id: str
    integration_id: str

@app.post("/crm")
def crm(payload: CRMInput):
    agent = CRMAgent(
        api_wrapper=APIWrapper(payload.credentials.access_token),
        crm_user={
            "default_owner_id": None,
            "default_pipeline_id": payload.pipeline_id,
        },
        internal_user={
            "user_id": payload.user_id,
            "integration_id": payload.integration_id,
        },
    )
    agent.run(payload.user_message, callbacks=None)

TLDR what this code does is:

Do you have any ideas how to turn this into a langserve project?

eyurtsev commented 10 months ago

hi @pokidyshev,

1) Create a runnable version of your executor. Check LCEL docs on how to do it. You'll likely need to make it configurable to make sure that you can change the memory at run time based on user identity (check example with configurable) 2) in add_routes there's a per request modifier parameter that you can use to add user specific information to the config.

We don't have good documentation yet to show how to do these, but you can look at implementation in https://github.com/langchain-ai/opengpts for reference

JevenZhou commented 10 months ago

Meet same problem and blocked, is it possible to give a sample that can use this memory config through the API?

eyurtsev commented 10 months ago

@JevenZhou will do -- will try to do it this week

pokidyshev commented 10 months ago

@eyurtsev Thanks! Looking forward to it.

eyurtsev commented 10 months ago

Haven't gotten around to full example yet, but we added this to the code base last week which should be fairly helpful: https://api.python.langchain.com/en/latest/schema.runnable/langchain.schema.runnable.history.RunnableWithMessageHistory.html

pokidyshev commented 10 months ago

Hi, @eyurtsev! Thanks for the update!

I've managed to attach message history based on session_id by using RunnableWithMessageHistory. Though I had to patch it so it saves intermediate steps too.

I'm now stuck with customizing agent tools based on user's access_token. I need to create a new instance of APIWrapper(access_token) on each request and then create a new set of tools from this instance and pass them to the agent. Any ideas how that can be achieved?

Now my code looks like this:

def init_chat_history(destination: str, session_id: str) -> BaseChatMessageHistory:
    return FirestoreChatMessageHistory(
        destination=destination,
        session_id=session_id,
        max_messages=5,
    )

system_message = SYSTEM_MESSAGE.format(warning="", custom_prompt="")
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_message),
        MessagesPlaceholder(variable_name="history"),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

# TODO: tools must be updated on each request
api_wrapper = PipedriveAPIWrapper(access_token)
tools = init_tools(api_wrapper, TOOLS, include_custom_fields=True)

llm = create_azure_gpt4()
llm_with_tools = llm.bind(functions=[format_tool_to_openai_function(t) for t in tools])

agent = (
    {
        "input": lambda x: x["input"],
        "history": lambda x: x["history"],
        "agent_scratchpad": lambda x: format_to_openai_function_messages(
            x["intermediate_steps"]
        ),
    }
    | prompt
    | llm_with_tools
    | OpenAIFunctionsAgentOutputParser()
)

executor = AgentExecutor(
    agent=agent,  # type: ignore
    tools=tools,
    max_iterations=15,
    handle_parsing_errors=True,
    return_intermediate_steps=True,
    tags=["pipedrive"],
    # metadata=hints_user,
    verbose=True,
)

executor_with_history = RunnableWithMessageHistory(
    executor,  # type: ignore
    partial(init_chat_history, "pipedrive"),
    history_messages_key="history",
)

class Input(BaseModel):
    input: str

class Output(BaseModel):
    output: str

app = FastAPI(title="LangChain Server", version="1.0")

add_routes(
    app,
    executor_with_history.with_types(input_type=Input, output_type=Output),
    path="/pipedrive",
)

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="localhost", port=8000)