open-webui / pipelines

Pipelines: Versatile, UI-Agnostic OpenAI-Compatible Plugin Framework
MIT License
956 stars 300 forks source link

Question: LangChain Agent / Runnable Interface method to return working stream / generator #141

Closed TiFaBl closed 3 months ago

TiFaBl commented 4 months ago

Hey all, I started using pipelines and really like the approach. I am currently trying to build a LangChain SQL Agent as a pipeline and facing some issues getting the output / stream / generator / iter correct. When I initiate it in the on_startup: function, it works as expected.

self.agent_executor = create_sql_agent(llm, db=self.db, agent_type="openai-tools", verbose=False)
print("Testing the agent:")
stream = self.agent_executor.stream("Which strategies exist?")
print(type(stream)) -> returns "<class 'generator'>"
print(*stream, sep='\n') -> unpacks the generator

But when I use that pipeline from the webui, I get a network error on the webui instance and the following error on the pipeline instance:

def pipe(
            self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:
  stream = self.agent_executor.stream(user_message)
  return stream

AttributeError: 'AddableDict' object has no attribute 'startswith'

It does work if I use the invoke() function on the LangChain Agent and select the output from the dict but of course this doesn't stream responses

stream = self.agent_executor.invoke(user_message)
return stream['output']

I assume, my key questions is: Which method of a LangChain Agent / Runnable Interface must be used - or how must it subsequently be manipulated - to get the correct Generator / Iter to return in the pipe() function? The return type of pipe only says Generator, and that is what I am returning, but there appear to be further requirements as it can't contain an AddableDict. Any documentation is welcome as well.

dukai289 commented 4 months ago

Hey all, I started using pipelines and really like the approach. I am currently trying to build a LangChain SQL Agent as a pipeline and facing some issues getting the output / stream / generator / iter correct. When I initiate it in the on_startup: function, it works as expected.

self.agent_executor = create_sql_agent(llm, db=self.db, agent_type="openai-tools", verbose=False)
print("Testing the agent:")
stream = self.agent_executor.stream("Which strategies exist?")
print(type(stream)) -> returns "<class 'generator'>"
print(*stream, sep='\n') -> unpacks the generator

But when I use that pipeline from the webui, I get a network error on the webui instance and the following error on the pipeline instance:

def pipe(
            self, user_message: str, model_id: str, messages: List[dict], body: dict
    ) -> Union[str, Generator, Iterator]:
  stream = self.agent_executor.stream(user_message)
  return stream

AttributeError: 'AddableDict' object has no attribute 'startswith'

It does work if I use the invoke() function on the LangChain Agent and select the output from the dict but of course this doesn't stream responses

stream = self.agent_executor.invoke(user_message)
return stream['output']

I assume, my key questions is: Which method of a LangChain Agent / Runnable Interface must be used - or how must it subsequently be manipulated - to get the correct Generator / Iter to return in the pipe() function? The return type of pipe only says Generator, and that is what I am returning, but there appear to be further requirements as it can't contain an AddableDict. Any documentation is welcome as well.

参考examples/pipelines/providers/anthropic_manifold_pipeline.py这个里面的例子,先实现stream_response函数进行yield,再在pipe函数里调用这个函数进行return

dukai289 commented 4 months ago

@TiFaBl 参考我上面的回答

TiFaBl commented 3 months ago

@dukai289 thanks! While I couldn't read your response / don't speak that language, the path to the function in the example anthropic pipeline was sufficient to figure it out. In case anyone else is ending up, here is my solution:

for chunk in self.chain_with_history.stream(
            input={
                "input": user_message,
            },
            config=self.config,
            ):
            # self.chat_message_history.add_ai_message(str(chunk))
            for k,s in chunk.items():
                if k == "output":
                    response += s
                    yield s