langflow-ai / langflow

Langflow is a low-code app builder for RAG and multi-agent AI applications. It’s Python-based and agnostic to any model, API, or database.
http://www.langflow.org
MIT License
27k stars 3.57k forks source link

Error: When trying to use a conditional router to branch a conversation #2508

Closed Nitewriter closed 2 weeks ago

Nitewriter commented 2 months ago

I'm attempting to implement a supervisor pattern where a language model classifies the current message given the conversation so far. I can get this to work perfectly within the UI by using a conditional router component, however, when I attempt to use the flow from the API it throws an error.

{
  "detail": "The message must be an iterator or an async iterator."
}

I suspect this is due to the need for two separate chat outputs in the flow to facilitate the branch in conversation. Here is an example flow that demonstrates the issue. I'm currently using the latest changes from main to test.

dont-talk-about-bruno.json

image
❯ curl -X POST \
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "message",
    "output_type": "chat",
    "input_type": "chat",
    "tweaks": {
  "ChatInput-Ma4Jp": {},
  "OpenAIModel-MdB0V": {},
  "OpenAIModel-NJ8p6": {},
  "Prompt-DfMtR": {},
  "ConditionalRouter-j94T7": {},
  "Pass-N8MXP": {},
  "ChatOutput-TIge3": {},
  "ChatOutput-Z6DoM": {},
  "OpenAIModel-Naaf3": {},
  "Pass-hbQT6": {},
  "Prompt-PE0iF": {},
  "Prompt-uCIqY": {},
  "Memory-oi31T": {}
}}'

{"detail":"The message must be an iterator or an async iterator."}
dosubot[bot] commented 2 months ago

Hey there, @Nitewriter! I'm here to help you with any bugs, questions, or contributions you might have. Let's tackle this issue together!

To resolve the error "The message must be an iterator or an async iterator" when using the ConditionalRouter component in your flow via the API, you need to ensure that the INPUT_FIELD_NAME parameter is set to an iterator or an async iterator. This is necessary for the flow to handle the message correctly.

Here's how you can address this issue:

  1. Ensure INPUT_FIELD_NAME is an Iterator or Async Iterator: Make sure that the INPUT_FIELD_NAME parameter in your flow is set to an iterator or an async iterator. Here is an example of how to set this parameter correctly:

    from typing import AsyncIterator
    
    async def example_async_iterator() -> AsyncIterator[str]:
       for i in range(5):
           yield f"Message {i}"
    
    # Assuming `vertex` is an instance of the class containing the `stream` method
    vertex.params[INPUT_FIELD_NAME] = example_async_iterator()
    await vertex.stream()
  2. Check the Implementation of the stream Method: Ensure that the stream method in your vertex class is correctly implemented to handle the iterator or async iterator. Here is a relevant code snippet that checks this condition:

    async def stream(self):
       iterator = self.params.get(INPUT_FIELD_NAME, None)
       if not isinstance(iterator, (AsyncIterator, Iterator)):
           raise ValueError("The message must be an iterator or an async iterator.")
       ...
  3. Example of Handling Streaming in Langflow: In the context of Langflow, the build_vertex_stream function in src/backend/base/langflow/api/v1/chat.py demonstrates how to handle streaming for a vertex. Here is an excerpt:

    @router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
    async def build_vertex_stream(
       flow_id: uuid.UUID,
       vertex_id: str,
       session_id: Optional[str] = None,
       chat_service: "ChatService" = Depends(get_chat_service),
       session_service: "SessionService" = Depends(get_session_service),
    ):
       try:
           flow_id_str = str(flow_id)
    
           async def stream_vertex():
               try:
                   cache = await chat_service.get_cache(flow_id_str)
                   if not cache:
                       raise ValueError(f"No cache found for {flow_id_str}.")
                   else:
                       graph = cache.get("result")
    
                   vertex: "InterfaceVertex" = graph.get_vertex(vertex_id)
                   if not hasattr(vertex, "stream"):
                       raise ValueError(f"Vertex {vertex_id} does not support streaming")
                   if isinstance(vertex._built_result, str) and vertex._built_result:
                       stream_data = StreamData(
                           event="message",
                           data={"message": f"Streaming vertex {vertex_id}"},
                       )
                       yield str(stream_data)
                       stream_data = StreamData(
                           event="message",
                           data={"chunk": vertex._built_result},
                       )
                       yield str(stream_data)
                   elif not vertex.frozen or not vertex._built:
                       logger.debug(f"Streaming vertex {vertex_id}")
                       stream_data = StreamData(
                           event="message",
                           data={"message": f"Streaming vertex {vertex_id}"},
                       )
                       yield str(stream_data)
                       async for chunk in vertex.stream():
                           stream_data = StreamData(
                               event="message",
                               data={"chunk": chunk},
                           )
                           yield str(stream_data)
                   elif vertex.result is not None:
                       stream_data = StreamData(
                           event="message",
                           data={"chunk": vertex._built_result},
                       )
                       yield str(stream_data)
                   else:
                       raise ValueError(f"No result found for vertex {vertex_id}")
    
               except Exception as exc:
                   logger.exception(f"Error building Component: {exc}")
                   exc_message = parse_exception(exc)
                   if exc_message == "The message must be an iterator or an async iterator.":
                       exc_message = "This stream has already been closed."
                   yield str(StreamData(event="error", data={"error": exc_message}))
               finally:
                   logger.debug("Closing stream")
                   await chat_service.set_cache(flow_id_str, graph)
                   yield str(StreamData(event="close", data={"message": "Stream closed"}))
    
           return StreamingResponse(stream_vertex(), media_type="text/event-stream")
       except Exception as exc:
           raise HTTPException(status_code=500, detail="Error building Component") from exc

By ensuring that the INPUT_FIELD_NAME parameter is correctly set to an iterator or async iterator and that the stream method is properly implemented, you should be able to resolve the error and successfully run the flow via the API [1][2][3][4].

To continue talking to Dosu, mention @dosu.

nicoloboschi commented 2 months ago

Hey @Nitewriter the error The message must be an iterator or an async iterator. normally means something went wrong in a component. I believe you will find an ERROR log line in logs that refers to the actual issue

Nitewriter commented 2 months ago

@nicoloboschi I'll see what I can find and post it here. Thank you.

Nitewriter commented 2 months ago

@nicoloboschi Here is the Traceback I get when calling the flow with curl:

ValueError: The message must be an iterator or an async iterator. (<class 'langflow.graph.vertex.types.ComponentVertex'>)                    

                             ╭─────────────────────────────────────────── Traceback (most recent call last) ────────────────────────────────────────────╮                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/api/v1/endpoints.py:203 in simplified_run_flow       │                 
                             │                                                                                                                          │                 
                             │   200 │   """                                                                                                            │                 
                             │   201 │   start_time = time.perf_counter()                                                                               │                 
                             │   202 │   try:                                                                                                           │                 
                             │ ❱ 203 │   │   result = await simple_run_flow(                                                                            │                 
                             │   204 │   │   │   flow=flow,                                                                                             │                 
                             │   205 │   │   │   input_request=input_request,                                                                           │                 
                             │   206 │   │   │   stream=stream,                                                                                         │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/api/v1/endpoints.py:107 in simple_run_flow           │                 
                             │                                                                                                                          │                 
                             │   104 │   │   │   │   │   and (input_request.output_type == "any" or input_request.output_type                           │                 
                             │       in vertex.id.lower())  # type: ignore                                                                              │                 
                             │   105 │   │   │   │   )                                                                                                  │                 
                             │   106 │   │   │   ]                                                                                                      │                 
                             │ ❱ 107 │   │   task_result, session_id = await run_graph_internal(                                                        │                 
                             │   108 │   │   │   graph=graph,                                                                                           │                 
                             │   109 │   │   │   flow_id=flow_id_str,                                                                                   │                 
                             │   110 │   │   │   session_id=input_request.session_id,                                                                   │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/processing/process.py:48 in run_graph_internal       │                 
                             │                                                                                                                          │                 
                             │    45 │                                                                                                                  │                 
                             │    46 │   fallback_to_env_vars = get_settings_service().settings.fallback_to_env_var                                     │                 
                             │    47 │                                                                                                                  │                 
                             │ ❱  48 │   run_outputs = await graph.arun(                                                                                │                 
                             │    49 │   │   inputs=inputs_list,                                                                                        │                 
                             │    50 │   │   inputs_components=components,                                                                              │                 
                             │    51 │   │   types=types,                                                                                               │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/graph/base.py:455 in arun                      │                 
                             │                                                                                                                          │                 
                             │    452 │   │   for _ in range(len(inputs) - len(types)):                                                                 │                 
                             │    453 │   │   │   types.append("chat")  # default to chat                                                               │                 
                             │    454 │   │   for run_inputs, components, input_type in zip(inputs, inputs_components, types):                          │                 
                             │ ❱  455 │   │   │   run_outputs = await self._run(                                                                        │                 
                             │    456 │   │   │   │   inputs=run_inputs,                                                                                │                 
                             │    457 │   │   │   │   input_components=components,                                                                      │                 
                             │    458 │   │   │   │   input_type=input_type,                                                                            │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/graph/base.py:357 in _run                      │                 
                             │                                                                                                                          │                 
                             │    354 │   │   │   │   raise ValueError(f"Vertex {vertex_id} not found")                                                 │                 
                             │    355 │   │   │                                                                                                         │                 
                             │    356 │   │   │   if not vertex.result and not stream and hasattr(vertex,                                               │                 
                             │        "consume_async_generator"):                                                                                       │                 
                             │ ❱  357 │   │   │   │   await vertex.consume_async_generator()                                                            │                 
                             │    358 │   │   │   if (not outputs and vertex.is_output) or (vertex.display_name in outputs or                           │                 
                             │        vertex.id in outputs):                                                                                            │                 
                             │    359 │   │   │   │   vertex_outputs.append(vertex.result)                                                              │                 
                             │    360                                                                                                                   │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/vertex/types.py:436 in consume_async_generator │                 
                             │                                                                                                                          │                 
                             │   433 │   │   self._built = True                                                                                         │                 
                             │   434 │                                                                                                                  │                 
                             │   435 │   async def consume_async_generator(self):                                                                       │                 
                             │ ❱ 436 │   │   async for _ in self.stream():                                                                              │                 
                             │   437 │   │   │   pass                                                                                                   │                 
                             │   438 │                                                                                                                  │                 
                             │   439 │   def _is_chat_input(self):                                                                                      │                 
                             │                                                                                                                          │                 
                             │ /Volumes/WillowTreeCS/WillowTree/langflow/src/backend/base/langflow/graph/vertex/types.py:363 in stream                  │                 
                             │                                                                                                                          │                 
                             │   360 │   async def stream(self):                                                                                        │                 
                             │   361 │   │   iterator = self.params.get(INPUT_FIELD_NAME, None)                                                         │                 
                             │   362 │   │   if not isinstance(iterator, (AsyncIterator, Iterator)):                                                    │                 
                             │ ❱ 363 │   │   │   raise ValueError(                                                                                      │                 
                             │   364 │   │   │   │   f"The message must be an iterator or an async iterator.                                            │                 
                             │       ({type(iterator)})"                                                                                                │                 
                             │   365 │   │   │   )                                                                                                      │                 
                             │   366 │   │   is_async = isinstance(iterator, AsyncIterator)                                                             │                 
                             ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯                 
                             ValueError: The message must be an iterator or an async iterator. (<class 'langflow.graph.vertex.types.ComponentVertex'>) 

I added the type of the iterator to the ValueError message so we could see what it did receive.

Nitewriter commented 2 months ago

@nicoloboschi Ok, I think I may have figured something out. This seems to be caused because the Conditional Router component calls stop on the false branch of the graph. The runner still wants to resolve the Chat Output component for that stopped branch of the graph and gets this error because the OpenAI component is inactive (not built). Should conversational branching be possible using the conditional router?

Nitewriter commented 2 months ago

Error Occurs on the About Bruno chat output component's input field

❯ curl -X POST \                                                                                                                                                                                       
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "What is it like being an AI?",
    "output_type": "chat",
    "input_type": "chat",
    "tweaks": {         
  "ChatInput-Ma4Jp": {},
  "Prompt-DfMtR": {},           
  "ConditionalRouter-j94T7": {},
  "Pass-N8MXP": {},      
  "ChatOutput-TIge3": {},
  "ChatOutput-Z6DoM": {},
  "Pass-hbQT6": {},  
  "Prompt-PE0iF": {},
  "Prompt-uCIqY": {},
  "Memory-oi31T": {},     
  "OpenAIModel-pHvfu": {},
  "OpenAIModel-w2hnb": {},
  "OpenAIModel-dUp0N": {}
}}'   

Error Occurs on the Not About Bruno chat output component's input field

❯ curl -X POST \                                                                                                                                                                                       
    "http://localhost:3000/api/v1/run/dea95449-11e1-4f8f-b22d-7ef6f8728c34?stream=false" \
    -H 'Content-Type: application/json'\
    -d '{"input_value": "Have you heard about Bruno?",
    "output_type": "chat",
    "input_type": "chat",
    "tweaks": {         
  "ChatInput-Ma4Jp": {},
  "Prompt-DfMtR": {},           
  "ConditionalRouter-j94T7": {},
  "Pass-N8MXP": {},      
  "ChatOutput-TIge3": {},
  "ChatOutput-Z6DoM": {},
  "Pass-hbQT6": {},  
  "Prompt-PE0iF": {},
  "Prompt-uCIqY": {},
  "Memory-oi31T": {},     
  "OpenAIModel-pHvfu": {},
  "OpenAIModel-w2hnb": {},
  "OpenAIModel-dUp0N": {}
}}'   
Nitewriter commented 2 months ago

Bailing instead of raising an error does allow the flow to work, however, it is probably not the solution you would want.

src/backend/base/langflow/graph/vertex/types.py:InterfaceVertex:stream

async def stream(self):
        iterator = self.params.get(INPUT_FIELD_NAME, None)
        if not isinstance(iterator, (AsyncIterator, Iterator)):
            return
nicoloboschi commented 2 months ago

Tagging @ogabrielluiz . This looks like a graph builder issue, could you take a look ?

carlosrcoelho commented 2 weeks ago

@Nitewriter Hey

It's been a while since we had any interaction here.

Do you need any assistance with this case?

carlosrcoelho commented 2 weeks ago

Thank you for your contribution! This issue will be closed. If you have any questions or encounter another problem, please open a new issue and we will be ready to assist you.