deepset-ai / haystack

:mag: LLM orchestration framework to build customizable, production-ready LLM applications. Connect components (models, vector DBs, file converters) to pipelines or agents that can interact with your data. With advanced retrieval methods, it's best suited for building RAG, question answering, semantic search or conversational agent chatbots.
https://haystack.deepset.ai
Apache License 2.0
14.56k stars 1.71k forks source link

Pipeline loops fail when BranchJoiner receives multiple inputs #7960

Open vblagoje opened 1 week ago

vblagoje commented 1 week ago

Pipeline loops in Haystack currently fail when the prompt_concatenator_after_observation component (see attached pipeline graph) loops back ChatMessage list to the main_input BranchJoiner. The BranchJoiner fails with the following error message:

  File "/Users/vblagoje/workspace/haystack/haystack/core/pipeline/pipeline.py", line 76, in _run_component
    res: Dict[str, Any] = instance.run(**inputs)
  File "/Users/vblagoje/workspace/haystack/haystack/components/joiners/branch.py", line 140, in run
    raise ValueError(f"BranchJoiner expects only one input, but {inputs_count} were received.")
ValueError: BranchJoiner expects only one input, but 2 were received.

looping_pipeline

This issue seem to originate in the BranchJoiner receiving both the initial input and the looped back input simultaneously, violating its pre-condition of a single input.

Steps to reproduce:

import os
from typing import List, Optional, Dict, Any
import re
from haystack.dataclasses import ChatMessage

from haystack import Document, component
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.converters import OutputAdapter
from haystack.components.routers import ConditionalRouter
from haystack.components.joiners import BranchJoiner
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.websearch import SerperDevWebSearch
from haystack import Pipeline
from haystack.utils import Secret

os.environ["OPENAI_API_KEY"] = "some-fake-key-replace-with-real-if-you-need-to-use-it"

def find_last_action(chat_messages: List[ChatMessage]):
    prompt: str = chat_messages[-1].content
    lines = prompt.strip().split('\n')
    for line in reversed(lines):
        pattern = r'Action:\s*(\w+)\[(.*?)\]'

        match = re.search(pattern, line)
        if match:
            action_name = match.group(1)
            parameter = match.group(2)
            return [action_name, parameter]
    return [None, None]

def concat_prompt(last_message: ChatMessage, current_prompt: List[ChatMessage], append: str):
    return [ChatMessage.from_user(current_prompt[-1].content + last_message.content + append)]

search_message_template = """
Given these web search results:

{% for doc in documents %}
    {{ doc.content }}
{% endfor %}

Be as brief as possible, max one sentence. 
Answer the question: {{search_query}}
"""

react_message_template = """
Solve a question answering task with interleaving Thought, Action, Observation steps.

Thought reasons about the current situation

Action can be:
google_search - Searches Google for the exact concept/entity (given in square brackets) and returns the results for you to use
finish - Returns the final answer (given in square brackets) and finishes the task

Observation sumarizes the Action outcome and helps in formulating the next
Thought in Thought, Action, Observation interleaving triplet of steps.

After each Observation, provide the next Thought and next Action.
Don't execute multiple steps even though you know the answer.
Only generate Thought and Action, never Observation, you'll get Observation from Action.
Follow the pattern in the example below.

Example:
###########################
Question: Which magazine was started first Arthur’s Magazine or First for Women?
Thought: I need to search Arthur’s Magazine and First for Women, and find which was started
first.
Action: google_search[When was 'Arthur’s Magazine' started?]
Observation: Arthur’s Magazine was an American literary periodical ˘
published in Philadelphia and founded in 1844. Edited by Timothy Shay Arthur, it featured work by
Edgar A. Poe, J.H. Ingraham, Sarah Josepha Hale, Thomas G. Spear, and others. In May 1846
it was merged into Godey’s Lady’s Book.
Thought: Arthur’s Magazine was started in 1844. I need to search First for Women founding date next
Action: google_search[When was 'First for Women' magazine started?]
Observation: First for Women is a woman’s magazine published by Bauer Media Group in the
USA. The magazine was started in 1989. It is based in Englewood Cliffs, New Jersey. In 2011
the circulation of the magazine was 1,310,696 copies.
Thought: First for Women was started in 1989. 1844 (Arthur’s Magazine) ¡ 1989 (First for
Women), so Arthur’s Magazine was started first.
Action: finish[Arthur’s Magazine]
############################

Let's start, the question is: {{query}}

Thought:
"""

routes = [
    {
        "condition": "{{'search' in tool_id_and_param[0]}}",
        "output": "{{tool_id_and_param[1]}}",
        "output_name": "search",
        "output_type": str,
    },
    {
        "condition": "{{'finish' in tool_id_and_param[0]}}",
        "output": "{{tool_id_and_param[1]}}",
        "output_name": "finish",
        "output_type": str,
    }
]

@component
class NoOp:
    @component.output_types(output=str)
    def run(self, query: str):
        return {"output": query}

class FakeThoughtActionOpenAIChatGenerator(OpenAIChatGenerator):

    @component.output_types(replies=List[ChatMessage])
    def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None):
        return {"replies": [ChatMessage.from_assistant("Thought: thinking\n Action: google_search[not important]\n")]}

class FakeConclusionOpenAIChatGenerator(OpenAIChatGenerator):

    @component.output_types(replies=List[ChatMessage])
    def run(self, messages: List[ChatMessage], generation_kwargs: Optional[Dict[str, Any]] = None):
        return {"replies": [ChatMessage.from_assistant("Tower of Pisa is 55 meters tall\n")]}

class FakeSerperDevWebSearch(SerperDevWebSearch):

    @component.output_types(documents=List[Document])
    def run(self, query: str):
        return {"documents": [Document(content="Eiffel Tower is 300 meters tall"),
                              Document(content="Tower of Pisa is 55 meters tall")]}

# main part
pipeline = Pipeline()
pipeline.add_component("main_input", BranchJoiner(List[ChatMessage]))
pipeline.add_component("prompt_builder", DynamicChatPromptBuilder(runtime_variables=["query"]))
pipeline.add_component("llm", FakeThoughtActionOpenAIChatGenerator(generation_kwargs={"stop": "Observation:"}))
pipeline.add_component("noop", NoOp())

# tools
pipeline.add_component("tool_extractor", OutputAdapter("{{messages | find_action}}",
                                                       output_type=List[str],
                                                       custom_filters={"find_action": find_last_action}))

pipeline.add_component("prompt_concatenator_after_action",
                       OutputAdapter("{{replies[-1] | concat_prompt(current_prompt,'')}}",
                                     output_type=List[ChatMessage],
                                     custom_filters={"concat_prompt": concat_prompt}))

pipeline.add_component("router", ConditionalRouter(routes))
pipeline.add_component("router_search",
                       FakeSerperDevWebSearch(api_key=Secret.from_token("some_fake_api_key")))
pipeline.add_component("search_prompt_builder",
                       DynamicChatPromptBuilder(runtime_variables=["documents", "search_query"]))
pipeline.add_component("search_llm", FakeConclusionOpenAIChatGenerator())
pipeline.add_component("router_finish", OutputAdapter("{{final_answer | format_final_answer}}",
                                                      output_type=str,
                                                      custom_filters={"format_final_answer": lambda x: x}))

pipeline.add_component("search_output_adapter", OutputAdapter("{{search_replies | format_observation}}",
                                                              output_type=List[ChatMessage],
                                                              custom_filters={"format_observation": lambda x: [
                                                                  ChatMessage.from_assistant(
                                                                      "Observation: " + x[-1].content + "\n")]}))

pipeline.add_component("prompt_concatenator_after_observation",
                       OutputAdapter("{{replies[-1] | concat_prompt(current_prompt, '\nThought:')}}",
                                     output_type=List[ChatMessage],
                                     custom_filters={"concat_prompt": concat_prompt}))

# main
pipeline.connect("main_input", "prompt_builder.prompt_source")
pipeline.connect("noop", "prompt_builder.query")
pipeline.connect("prompt_builder.prompt", "llm.messages")
pipeline.connect("llm.replies", "prompt_concatenator_after_action.replies")

# tools
pipeline.connect("prompt_builder.prompt", "prompt_concatenator_after_action.current_prompt")
pipeline.connect("prompt_concatenator_after_action", "tool_extractor.messages")

pipeline.connect("tool_extractor", "router")
pipeline.connect("router.search", "router_search.query")
pipeline.connect("router_search.documents", "search_prompt_builder.documents")
pipeline.connect("router.search", "search_prompt_builder.search_query")
pipeline.connect("search_prompt_builder.prompt", "search_llm.messages")
pipeline.connect("router.finish", "router_finish")

pipeline.connect("search_llm.replies", "search_output_adapter.search_replies")
pipeline.connect("search_output_adapter", "prompt_concatenator_after_observation.replies")
pipeline.connect("prompt_concatenator_after_action", "prompt_concatenator_after_observation.current_prompt")
pipeline.connect("prompt_concatenator_after_observation", "main_input")

search_message = [ChatMessage.from_user(search_message_template)]
messages = [ChatMessage.from_user(react_message_template)]
question = "which tower is taller: eiffel tower or tower of pisa?"
res = pipeline.run(data={"main_input": {"value": messages},
                         "noop": {"query": question},
                         "search_prompt_builder": {"prompt_source": search_message}})

print(res)

Expected behavior: The pipeline should handle loops correctly, allowing the BranchJoiner to process looped inputs sequentially rather than simultaneously.

Actual behavior: The pipeline fails when the loop feeds back to the BranchJoiner, causing it to receive multiple inputs at once raising the above mentioned exception

mrm1001 commented 3 days ago

Hi @vblagoje is this error related to this issue: https://github.com/deepset-ai/haystack/issues/7740?

vblagoje commented 3 days ago

@mrm1001 yes - we can't do any react agent loops until this one has been resolved