pipecat-ai / pipecat

Open Source framework for voice and multimodal conversational AI
BSD 2-Clause "Simplified" License
3.12k stars 273 forks source link

LangchainProcessor can't use custom invocation parameters as config #335

Open agilebean opened 1 month ago

agilebean commented 1 month ago

This is a strategically important issue for me to use pipecat for good.

pipecat:LangchainProcessor wraps langchain:RunnableWithHistory. I need to replace session_id with two fields user_id and conversation_id as shown in the Langchain documentation.

My code

        chain_runnable = RunnableWithMessageHistory(
            prompted_chat_model,
            # lambda session_id: get_session_history(database_label, session_id),
            lambda session_id: get_session_history(database_label, user_id, conversation_id),
            history_messages_key="chat_history",
            input_messages_key="input",
            **memory_kwargs,
            history_factory_config=[
                ConfigurableFieldSpec(
                    id="user_id",
                    annotation=str,
                    name="User ID",
                    description="Unique identifier for the user.",
                    default="",
                    is_shared=True,
                ),
                ConfigurableFieldSpec(
                    id="conversation_id",
                    annotation=str,
                    name="Conversation ID",
                    description="Unique identifier for the conversation.",
                    default="",
                    is_shared=True,
                ),
            ],
        )

Error

The above code throws

ValueError: Missing keys ['conversation_id', 'user_id'] in config['configurable'] Expected keys are ['conversation_id', 'user_id'].When using via .invoke() or .stream(), pass in a config; e.g., chain.invoke({'input': 'foo'}, {'configurable': {'conversation_id': '[your-value-here]', 'user_id': '[your-value-here]'}})

Tried changing the pipecat code to replace session_id in langchain.py as:

    async def _ainvoke(self, text: str):
        logger.debug(f"Invoking chain with {text}")
        await self.push_frame(LLMFullResponseStartFrame())
        try:
            async for token in self._chain.astream(
                {self._transcript_key: text},
                # config={"configurable": {"session_id": self._participant_id}},
                config={"configurable": {
                    'user_id': self._user_id,
                    'conversation_id': self._conversation_id
                }},
            ):
                await self.push_frame(TextFrame(self.__get_token_value(t

but this throws

ValueError: Expected keys ['conversation_id', 'user_id'] do not match parameter names ['session_id'] of get_session_history. 

I could extend that to accept the custom parameters but i was looking for a more general solution which established pipecat contributors like @TomTom101 could think of.

Langchain code

As I understand the langchain code, this is how the custom config can be passed: langchain_core/runnables/base.py, line 5242, in astream():

self._merge_configs(config)

which calls langchain_core/runnables/history.py, lines 533ff:

expected_keys = [field_spec.id for field_spec in self.history_factory_config]
...
            message_history = self.get_session_history(
                **{key: configurable[key] for key in expected_keys}
            )

Pipecat code

So in pipecat/processors/frame_processor.py, the _ainvoke method uses only the session_id

    async def _ainvoke(self, text: str):
        logger.debug(f"Invoking chain with {text}")
        await self.push_frame(LLMFullResponseStartFrame())
        try:
            async for token in self._chain.astream(
                {self._transcript_key: text},
                config={"configurable": {"session_id": self._participant_id}},
            ):

Suggested Solution

So _ainvoke could be extended to pass the the keys from langchain's self.history_factory_config? Maybe something along this snippet:

    async def _ainvoke(self, text: str):
        logger.debug(f"Invoking chain with {text}")
        await self.push_frame(LLMFullResponseStartFrame())

        if hasattr(self, '_configurable'):
            configurable = self._configurable
        else:
            configurable = {"configurable": {"session_id": self._participant_id}}
        try:
            async for token in self._chain.astream(
                {self._transcript_key: text},
                config=configurable
            ):
                await self.push_frame(TextFrame(self.__get_token_value(token)))

OR is there another way to pass the config dict for the new parameters to LangchainProcessor?

TomTom101 commented 1 month ago

Thanks! Try this branch and set your configurables like this

config = {"user_id": "123", "conversation_id": "1"}
lc.set_configurable(config)

session_id is an configurable on its own and should not be hard-coded anyway.

agilebean commented 4 days ago

@TomTom101 I tried this but didn't work. Doesn't the invoke method need to access it like this?

    async def _ainvoke(self, text: dict):
...
        if hasattr(self, '_configurable'):
            configurable = self._configurable
        else:
            configurable = {"configurable": {"session_id": self._participant_id}}