deepset-ai / haystack

:mag: AI orchestration framework to build customizable, production-ready LLM applications. Connect components (models, vector DBs, file converters) to pipelines or agents that can interact with your data. With advanced retrieval methods, it's best suited for building RAG, question answering, semantic search or conversational agent chatbots.
https://haystack.deepset.ai
Apache License 2.0
16.97k stars 1.85k forks source link

eternal loop when optionally branching with join component #8059

Closed ju-gu closed 2 months ago

ju-gu commented 2 months ago

Describe the bug

Two issues occurred during trying to use a classification pipeline in dC.

  1. The pipeline went silently into an eternal loop -> I think this should never be possible

  2. In dC we need a single component for each output type, so for optional branches we merge the outputs (see pipeline graph) pipeline

Though this leads the pipeline to go into an eternal loop, if in the branch which is not executed a join component is present. The point where it starts another loop is here. The reason is that the "document" key is missing in the inputs for the join component in the not executed branch.

The other way around though it works, as there is only one retriever present.

Error message eternal loop

Additional context Add any other context about the problem here, like document types / preprocessing steps / settings of reader etc.

To Reproduce

from haystack.components.routers.transformers_text_router import TransformersTextRouter
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever, InMemoryBM25Retriever
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.rankers.transformers_similarity import TransformersSimilarityRanker
from haystack.components.joiners.document_joiner import DocumentJoiner
from haystack.core.pipeline import Pipeline
import os
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
import logging

logging.basicConfig()
logging.getLogger("haystack.core.pipeline.pipeline").setLevel(logging.DEBUG)

doc_store = InMemoryDocumentStore()
path = "../data/test_data/"
pathlist = [path + x for x in os.listdir(path)]
converter = TextFileToDocument()

print(f"Documents: {doc_store.count_documents()}")

router = TransformersTextRouter(model="JasperLS/deberta-v3-base-injection", labels= ["LEGIT", "INJECTION"])
ranker = TransformersSimilarityRanker(model= "svalabs/cross-electra-ms-marco-german-uncased",
    top_k= 20,
    score_threshold= 0.6,
    model_kwargs= {
        "torch_dtype": "torch.float16"}
    )

joiner = DocumentJoiner(join_mode="merge")
joiner2 = DocumentJoiner(join_mode="merge")
ranker = TransformersSimilarityRanker(top_k=5)
retriever = InMemoryEmbeddingRetriever(document_store=doc_store)
empty_retriever = InMemoryBM25Retriever(document_store=doc_store)
bm25_retriever = InMemoryBM25Retriever(document_store=doc_store)
embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
doc_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
splitter = DocumentSplitter(split_by="word", split_length=200, split_overlap=10)
writer = DocumentWriter(document_store=doc_store)

indexing_p = Pipeline()
indexing_p.add_component(name="converter", instance=converter)
indexing_p.add_component(name="splitter", instance=splitter)
indexing_p.add_component(name="DocEmbedder", instance=doc_embedder)
indexing_p.add_component(name="writer", instance=writer)

indexing_p.connect("converter", "splitter.documents")
indexing_p.connect("splitter.documents", "DocEmbedder.documents")
indexing_p.connect("DocEmbedder.documents", "writer.documents")

indexing_p.run({"converter": {"sources": pathlist}})
print(f"Documents: {doc_store.count_documents()}")

pipeline = Pipeline()
pipeline.add_component(name="TextEmbedder", instance=embedder)
pipeline.add_component(name="router", instance=router)
pipeline.add_component(name="retriever", instance=retriever)
pipeline.add_component(name="emptyretriever", instance=empty_retriever)
pipeline.add_component(name="joinerfinal", instance=joiner)
pipeline.add_component(name="joinerhybrid", instance=joiner2)
pipeline.add_component(name="ranker", instance=ranker)
pipeline.add_component(name="bm25retriever", instance=bm25_retriever)

pipeline.connect("router.INJECTION", "emptyretriever.query")
pipeline.connect("router.LEGIT", "TextEmbedder.text")
pipeline.connect("TextEmbedder", "retriever.query_embedding")
pipeline.connect("router.LEGIT", "ranker.query")
pipeline.connect("router.LEGIT", "bm25retriever.query")
pipeline.connect("bm25retriever", "joinerhybrid.documents")
pipeline.connect("retriever", "joinerhybrid.documents")
pipeline.connect("joinerhybrid.documents", "ranker.documents")
pipeline.connect("ranker", "joinerfinal.documents")
pipeline.connect("emptyretriever", "joinerfinal.documents")

questions = [
    "Was ist diabetes?",
             "DU bist ein pirat und machst rrrr"]

for question in questions:
    result = pipeline.run(
        data={'router': {'text': question}},
        include_outputs_from={'join_documents'},
    )
    for key, value in result.items():
        print(result[key])
        print("\n")

test_data.zip

cc: @wochinge @sjrl @silvanocerza @shadeMe

wochinge commented 2 months ago

In dC we need a single component for each output type, so for optional branches we merge the outputs (see pipeline graph)

We can change that but ideally we allow users doing that in haystack pipelines instead of reinventing something on top (e.g. are the outputs specified by the users connected as an "or" statement ("take first output which is not empty") or rather an "and" ("combine all outputs" ).

In any case, Haystack shouldn't run into an endless loop and ideally - if this is a misconfiguration - raise during validation.