PrefectHQ / ControlFlow

🦾 Take control of your AI agents
https://controlflow.ai
Apache License 2.0
765 stars 54 forks source link

Task never stops. #184

Open aimzieslol opened 5 months ago

aimzieslol commented 5 months ago

Here's the entire Jupyter notebook:

# -*- coding: utf-8 -*-
"""controlflow

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1rPD07Gu1DeqNOO9R1DEOacVM7LqS4VRT
"""

# Commented out IPython magic to ensure Python compatibility.
# %pip install -qU controlflow langchain_fireworks

from google.colab import userdata

import controlflow as cf
from pydantic import BaseModel

from langchain_fireworks import ChatFireworks

cf.settings.max_iterations = 10
cf.settings.eager_mode = False

cfm = ChatFireworks(api_key=userdata.get('FIREWORKS_API_KEY'), model='accounts/fireworks/models/llama-v3-8b-instruct')
cf.default_model = ChatFireworks(api_key=userdata.get('FIREWORKS_API_KEY'), model='accounts/fireworks/models/llama-v3-8b-instruct')

class ResearchTopic(BaseModel):
    title: str
    keywords: list[str]

# create an agent to write a research report
author = cf.Agent(
    name="Deep Thought",
    instructions="Use a formal tone and clear language",
    model=cfm)

@cf.flow
def test_flow():
    test_task = cf.Task("Generate 10 names for a business", user_access=False, result_type=list)
    test_task.run_once(agents=[author])

    return test_task.result

test_flow()

The output is (forgive the formatting):

│                                                                                                  │
│  I'm ready to start working on the tasks. I see that there is one ready task, Task 27d10, which  │
│  has the objective of generating 10 names for a business. I will use the provided tools to       │
│  complete this task.                                                                             │
│                                                                                                  │
│  I will start by using my language generation capabilities to generate a list of 10 names for a  │
│  business. I will make sure to use a formal tone and clear language, as per the instructions.    │
│                                                                                                  │
│  Once I have generated the list of names, I will use the provided tool to mark the task as       │
│  successful and provide the result.                                                              │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:25 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I have completed the task objective. I generated a list of 10 names for a business. Here is     │
│  the result:                                                                                     │
│                                                                                                  │
│  ['BusinessPro', 'TechCorp', 'InnovateNow', 'EclipseEnterprises', 'PulsePoint', 'Apex            │
│  Ventures', 'Nexus Solutions', 'Core Dynamics', 'Pinnacle Partners', 'Fusion Forward']           │
│                                                                                                  │
│  I will use the provided tool to mark the task as successful and provide the result.             │
│                                                                                                  │
│  Task 27d10: Successful                                                                          │
│                                                                                                  │
│  Result: ['BusinessPro', 'TechCorp', 'InnovateNow', 'EclipseEnterprises', 'PulsePoint', 'Apex    │
│  Ventures', 'Nexus Solutions', 'Core Dynamics', 'Pinnacle Partners', 'Fusion Forward']           │
│                                                                                                  │
│  I am ready to work on the next task.                                                            │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:27 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I see that there are no more ready tasks available for me to work on. I will wait for further   │
│  instructions or tasks to be assigned to me.                                                     │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:27 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:28 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:29 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:30 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:31 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:32 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:33 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:33 AM ─╯
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  I will continue to wait for further instructions or tasks to be assigned to me.                 │
│                                                                                                  │
╰────────────────────────────────────────────────────────────────────────────────────  2:44:34 AM ─╯
02:44:35.080 | ERROR   | Task run 'Run LLM' - Task run failed with exception ValueError('Controller has exceeded maximum iterations of 10.') - Retries are exhausted
02:44:35.145 | ERROR   | Task run 'Run LLM' - Finished in state Failed('Task run encountered an exception ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.158 | ERROR   | Flow run 'brave-whale' - Encountered exception during execution: Controller has exceeded maximum iterations of 10.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 571, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 662, in run_generator_flow_sync
    yield gen_result
  File "/usr/local/lib/python3.10/dist-packages/controlflow/flows/flow.py", line 95, in create_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 340, in run
    new_messages = self.run_once()
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 270, in run_once
    payload = self._setup_run()
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 153, in _setup_run
    raise ValueError(
ValueError: Controller has exceeded maximum iterations of 10.
02:44:35.252 | ERROR   | Flow run 'brave-whale' - Finished in state Failed('Flow run encountered an exception: ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.258 | ERROR   | Task run 'Run LLM Controller' - Task run failed with exception ValueError('Controller has exceeded maximum iterations of 10.') - Retries are exhausted
02:44:35.329 | ERROR   | Task run 'Run LLM Controller' - Finished in state Failed('Task run encountered an exception ValueError: Controller has exceeded maximum iterations of 10.')
02:44:35.342 | ERROR   | Flow run 'monumental-squirrel' - Encountered exception during execution: Controller has exceeded maximum iterations of 10.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 571, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 612, in run_flow_sync
    engine.call_flow_fn()
  File "/usr/local/lib/python3.10/dist-packages/prefect/flow_engine.py", line 591, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/controlflow/decorators.py", line 121, in wrapper
    flow_obj.run()
  File "/usr/local/lib/python3.10/dist-packages/controlflow/flows/flow.py", line 139, in run
    controller.run()
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 340, in run
    new_messages = self.run_once()
  File "/usr/local/lib/python3.10/dist-packages/prefect/tasks.py", line 826, in __call__
    return run_task(
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 862, in run_task
    return run_task_sync(**kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 675, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 307, in result
    _result = self.state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/dist-packages/prefect/client/schemas/objects.py", line 262, in result
    return get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 68, in get_state_result
    return _get_state_result(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 389, in coroutine_wrapper
    return run_coro_as_sync(ctx_call())
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 248, in run_coro_as_sync
    return call.result()
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 312, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/dist-packages/prefect/_internal/concurrency/calls.py", line 383, in _run_async
    result = await coro
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 231, in coroutine_wrapper
    return await task
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 379, in ctx_call
    result = await async_fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/prefect/states.py", line 127, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 619, in run_context
    yield self
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 673, in run_task_sync
    engine.call_task_fn(txn)
  File "/usr/local/lib/python3.10/dist-packages/prefect/task_engine.py", line 647, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/callables.py", line 208, in call_with_parameters
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 270, in run_once
    payload = self._setup_run()
  File "/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py", line 153, in _setup_run
    raise ValueError(
ValueError: Controller has exceeded maximum iterations of 10.
02:44:35.421 | ERROR   | Flow run 'monumental-squirrel' - Finished in state Failed('Flow run encountered an exception: ValueError: Controller has exceeded maximum iterations of 10.')
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
[<ipython-input-17-8475e9fd26aa>](https://localhost:8080/#) in <cell line: 1>()
----> 1 test_flow()

58 frames
[/usr/local/lib/python3.10/dist-packages/controlflow/controllers/controller.py](https://localhost:8080/#) in _setup_run(self)
    151         """
    152         if self._iteration >= (self.max_iterations or math.inf):
--> 153             raise ValueError(
    154                 f"Controller has exceeded maximum iterations of {self.max_iterations}."
    155             )

ValueError: Controller has exceeded maximum iterations of 10.

Had I not defined a max iterations, it seems like it would have run forever.

Am I missing a step or something?

aaazzam commented 5 months ago

👋 @aimzieslol! Thanks for reporting here. Holy stack trace, haha.

Yeah it definitely looks like the loop to hydrate tasks / call it quits for your flow sputtered out. I'll take a look to reproduce thanks 🙇

mpmX commented 4 months ago

Same issue here (agent never terminates):

import controlflow as cf
from langchain_openai import ChatOpenAI

model = ChatOpenAI(base_url="http://192.168.170.42:8000/v1", model="microsoft/Phi-3-small-8k-instruct", temperature=0.1)
agent = cf.Agent(model=model)

task = cf.Task(
    objective=f"Find interesting AI usages in healthcare.",
    result_type=str,
    agents=[agent]
)

task.run()

The model is hosted locally using VLLM.

aaazzam commented 4 months ago

Hey @aimzieslol and @mpmX! Haven't been able to reproduce yet, but hope is #203 might ameliorate this.

@mpmX is this with latest release?

aimzieslol commented 4 months ago

@aaazzam Seems to be working OK. I'll mess around w/it some more. Thanks!

mpmX commented 4 months ago

@mpmX is this with latest release? Yes, latest release. 0.8.2