langflow-ai / langflow

Langflow is a low-code app builder for RAG and multi-agent AI applications. It’s Python-based and agnostic to any model, API, or database.
http://www.langflow.org
MIT License
33.52k stars 4.07k forks source link

CancelledError when component runs for longer than 300 seconds #4443

Closed cimuletz closed 2 hours ago

cimuletz commented 2 hours ago

Bug Description

I have an LLM evaluation flow which receives CancelledError when component runs longer than 300 seconds.

LANGFLOW_WORKER_TIMEOUT is set to 3000 so this doesn't seem to be the problem.

Reproduction

  1. Add 2 custom components and add vertex between them
  2. Add 300 seconds of sleep to the first component
  3. Following error will occur: asyncio.exceptions.CancelledError

                         ╭─ Traceback (most recent call last) ─╮            
                         │ /app/.venv/lib/python3.12/site-pack │            
                         │ ages/langflow/api/v1/chat.py:328 in │            
                         │ build_vertices                      │            
                         │                                     │            
                         │   325 │   ) -> None:                │            
                         │   326 │   │   build_task = asyncio. │            
                         │       _build_vertex(vertex_id, grap │            
                         │   327 │   │   try:                  │            
                         │ ❱ 328 │   │   │   await build_task  │            
                         │   329 │   │   except asyncio.Cancel │            
                         │   330 │   │   │   logger.exception( │            
                         │   331 │   │   │   build_task.cancel │            
                         │                                     │            
                         │ /usr/local/lib/python3.12/asyncio/f │            
                         │ utures.py:287 in __await__          │            
                         │                                     │            
                         │   284 │   def __await__(self):      │            
                         │   285 │   │   if not self.done():   │            
                         │   286 │   │   │   self._asyncio_fut │            
                         │ ❱ 287 │   │   │   yield self  # Thi │            
                         │   288 │   │   if not self.done():   │            
                         │   289 │   │   │   raise RuntimeErro │            
                         │   290 │   │   return self.result()  │            
                         │                                     │            
                         │ /usr/local/lib/python3.12/asyncio/t │            
                         │ asks.py:385 in __wakeup             │            
                         │                                     │            
                         │    382 │                            │            
                         │    383 │   def __wakeup(self, futur │            
                         │    384 │   │   try:                 │            
                         │ ❱  385 │   │   │   future.result()  │            
                         │    386 │   │   except BaseException │            
                         │    387 │   │   │   # This may also  │            
                         │    388 │   │   │   self.__step(exc) │            
                         │                                     │            
                         │ /usr/local/lib/python3.12/asyncio/f │            
                         │ utures.py:198 in result             │            
                         │                                     │            
                         │   195 │   │   """                   │            
                         │   196 │   │   if self._state == _CA │            
                         │   197 │   │   │   exc = self._make_ │            
                         │ ❱ 198 │   │   │   raise exc         │            
                         │   199 │   │   if self._state != _FI │            
                         │   200 │   │   │   raise exceptions. │            
                         │   201 │   │   self.__log_traceback  │            
                         │                                     │            
                         │ /usr/local/lib/python3.12/asyncio/t │            
                         │ asks.py:316 in                      │            
                         │ __step_run_and_handle_result        │            
                         │                                     │            
                         │    313 │   │   │   │   # don't have │            
                         │    314 │   │   │   │   result = cor │            
                         │    315 │   │   │   else:            │            
                         │ ❱  316 │   │   │   │   result = cor │            
                         │    317 │   │   except StopIteration │            
                         │    318 │   │   │   if self._must_ca │            
                         │    319 │   │   │   │   # Task is ca │            
                         │                                     │            
                         │ /usr/local/lib/python3.12/asyncio/t │            
                         │ hreads.py:25 in to_thread           │            
                         │                                     │            
                         │   22 │   loop = events.get_running_ │            
                         │   23 │   ctx = contextvars.copy_con │            
                         │   24 │   func_call = functools.part │            
                         │ ❱ 25 │   return await loop.run_in_e │            
                         │   26                                │            
                         │                                     │            
                         │ /usr/local/lib/python3.12/asyncio/f │            
                         │ utures.py:287 in __await__          │            
                         │                                     │            
                         │   284 │   def __await__(self):      │            
                         │   285 │   │   if not self.done():   │            
                         │   286 │   │   │   self._asyncio_fut │            
                         │ ❱ 287 │   │   │   yield self  # Thi │            
                         │   288 │   │   if not self.done():   │            
                         │   289 │   │   │   raise RuntimeErro │            
                         │   290 │   │   return self.result()  │            
                         │                                     │            
                         │ /usr/local/lib/python3.12/asyncio/t │            
                         │ asks.py:385 in __wakeup             │            
                         │                                     │            
                         │    382 │                            │            
                         │    383 │   def __wakeup(self, futur │            
                         │    384 │   │   try:                 │            
                         │ ❱  385 │   │   │   future.result()  │            
                         │    386 │   │   except BaseException │            
                         │    387 │   │   │   # This may also  │            
                         │    388 │   │   │   self.__step(exc) │            
                         │                                     │            
                         │ /usr/local/lib/python3.12/asyncio/f │            
                         │ utures.py:198 in result             │            
                         │                                     │            
                         │   195 │   │   """                   │            
                         │   196 │   │   if self._state == _CA │            
                         │   197 │   │   │   exc = self._make_ │            
                         │ ❱ 198 │   │   │   raise exc         │            
                         │   199 │   │   if self._state != _FI │            
                         │   200 │   │   │   raise exceptions. │            
                         │   201 │   │   self.__log_traceback  │            
                         ╰─────────────────────────────────────╯            
                         CancelledError 

Expected behavior

Components to keep running for however long they take and resume flow with the next component.

Who can help?

@ogabrielluiz @nicoloboschi @zzzming

Operating System

Ubuntu Linux

Langflow Version

1.0.19

Python Version

None

Screenshot

No response

Flow File

No response

dosubot[bot] commented 2 hours ago

Hey @cimuletz, I'm here to help you with any bugs, questions, or contributions you have! Let's tackle this issue together.

The issue you're experiencing with the CancelledError after 300 seconds, despite setting LANGFLOW_WORKER_TIMEOUT to 3000, might be due to the precedence of configuration settings. The LANGFLOW_WORKER_TIMEOUT environment variable is set to 300 by default, and if you're using a CLI command with the --worker-timeout option, it will override the environment variable. Make sure that the --worker-timeout option is not set to 300 in your CLI command, or adjust it to 3000 if it is. Also, verify that your .env file correctly sets LANGFLOW_WORKER_TIMEOUT to 3000 [1][2].

To continue talking to Dosu, mention @dosu.