deepset-ai / canals

A component orchestration engine
https://deepset-ai.github.io/canals/
Apache License 2.0
27 stars 2 forks source link

Component is stuck waiting for input. Optional inputs are not skipped #112

Closed tholor closed 10 months ago

tholor commented 11 months ago

Pipeline gets stuck unexpectedly.

Error message:

DEBUG:canals.pipeline.pipeline:> Queue at step 6: {'input_switch': []}
DEBUG:canals.pipeline.pipeline:Component 'input_switch' is waiting. All mandatory inputs received, some optional are not skipped: {'input_1'}
Traceback (most recent call last):
  File "/home/mp/deepset/dev/haystack/debug/loops_v2.py", line 84, in <module>
    result = pipeline.run({
             ^^^^^^^^^^^^^^
  File "/home/mp/deepset/dev/haystack/venv/lib/python3.11/site-packages/canals/pipeline/pipeline.py", line 459, in run
    raise PipelineRuntimeError(
canals.errors.PipelineRuntimeError: 'input_switch' is stuck waiting for input, but there are no other components to run. This is likely a Canals bug. Open an issue at https://github.com/deepset-ai/canals.

Process finished with exit code 1

Script to reproduce:

import os
from haystack.preview import Pipeline, Document
from haystack.preview.document_stores import MemoryDocumentStore
from haystack.preview.components.retrievers import MemoryBM25Retriever
from haystack.preview.components.generators.openai.gpt35 import GPT35Generator
from haystack.preview.components.builders.answer_builder import AnswerBuilder
from haystack.preview.components.builders.prompt_builder import PromptBuilder
import random
from haystack.preview import component
from typing import Optional, List

import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)

@component
class OutputParser():
# Dummy parser
    @component.output_types(valid=List[str], invalid=Optional[List[str]])
    def run(
        self,
        replies: List[str]):

        if random.randint(0,100) > 70:
            #raise Exception("Faile to parse input: the field xyz is missing")
            print("Faile to parse input: the field xyz is missing")
            return {"valid": None, "invalid": replies}
        else:
            print("Successfully parsed input")
            return {"valid": replies, "invalid": None}

@component
class InputSwitch():

    @component.output_types(output=str)
    def run(self, input_1: Optional[List[str]]=None, input_2: Optional[List[str]]=None):
        if input_1 is not None:
            return {"output": input_1}
        else:
            return {"output": input_2}

query  = ("Create a json file of the 3 biggest cities in the wolrld with the following fields: name, country, and population. "
          "None of the fields must be empty.")

prompt_template = """
 {{query}}
  {% if replies %}
    We already got the following output: {{replies}}
    However, this doesn't comply with the format requirements from above. Please correct the output and try again.
  {% endif %}
"""

pipeline = Pipeline()
pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
pipeline.add_component(instance=GPT35Generator(api_key=os.environ.get("OPENAI_API_KEY")), name="llm")
pipeline.add_component(instance=OutputParser(), name="output_parser")

pipeline.add_component(instance=InputSwitch(), name="input_switch")

pipeline.connect("prompt_builder", "llm")
pipeline.connect("input_switch.output", "prompt_builder.replies")
pipeline.connect("llm", "output_parser")

pipeline.connect("output_parser.invalid", "input_switch.input_2")

## Run the Pipeline
result = pipeline.run({
    "prompt_builder":   {"query": query},
    "input_switch":     {"input_1": []},
})

print(result)
ZanSara commented 11 months ago

Note: this code runs fine on the FSM implementation: https://github.com/deepset-ai/canals/pull/55

ZanSara commented 10 months ago

Tested and working on https://github.com/deepset-ai/canals/pull/148.

This code's graph:

test_1

test/pipeline/integration/test_looping_pipeline::test_pipeline_direct_io_loop:

looping_pipeline_direct_io_loop