PrefectHQ / ControlFlow

🦾 Take control of your AI agents
https://controlflow.ai
Apache License 2.0
593 stars 40 forks source link

Docs? How to run tasks in parallel and mark tasks that were done to be incomplete and/or loop executions #240

Open MarkEdmondson1234 opened 2 months ago

MarkEdmondson1234 commented 2 months ago

Enhancement Description

I've looked through the docs but its not clear to me if its possible to assign tasks to run in parrallel, or if its an anti-pattern to try to mark tasks that are complete as not complete anymore to get the behaviour I'm looking for. I'm looking for a self-improvement loop where the same tasks are completed by agents but with each iteration with a new follow up question derived from the conversation so far.

Use Case

I have this flow:

import controlflow as cf
from typing import Annotated, List, Optional
from pydantic import BaseModel, Field

from agents import (
    code_executor,
    web_researcher,
    database_researcher,
    compere
)

class FastThoughts(BaseModel):
    inital_response: str
    can_answer: bool

@cf.flow()
def fast_n_slow_flow(question, vector_name="control_flow", chat_history=[]):

    agents = ["code_executor", "web_researcher", "database_researcher"]
    agents_obj = [code_executor, web_researcher, database_researcher]

    task_master = cf.Task(
        """
Frame the question from the user in language that all your co-assistants can understand and ask which agents on your team will be able to answer it.
They should reply with thier first initial reactions, but will start a slower more considered response that should come later.
""",
        agents=[compere],
        result_type=str, 
    )
    task_master.run()
    print(task_master.result)

    slow_thought_agents = []

    for agent in agents_obj:
        fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[agent],
            context=dict(user_question=task_master),
            result_type=FastThoughts
        )
        fast_answer = fast_thoughts.run()
        print(fast_answer)

        if fast_answer.can_answer:
            print(f"Adding {agent.name} to slow thoughts")
            slow_thought_agents.append(agent)

    who_speaks_next = cf.Task(
        "From the conversation so far and the assistants recent reactions, decide who should speak next",
        context=dict(assistant_reactions=fast_thoughts),
        agents=[compere],
        result_type=agents
    )
    who_speaks_next.run()
    print(who_speaks_next.result) # only result available if you do not supply a result_type

    slow_thoughts = []
    for agent in slow_thought_agents:
        try:
            slow_thought = cf.Task(
                "You indicated you could contribute more to the conversation with your skills - please now answer the question fully using all tools at your disposal.",
                agents=[agent]
            )
        except ValueError:
            print("{agent.name} could not help with a slow thought")
            continue

        slow_thought.run()
        print(slow_thought.result)
        slow_thoughts.append(slow_thought)

    summary = cf.Task(
        """
        Compile all the answers so far and summarise the conversation so far.  Highlight what each team member has brought to the discussion.
        Consider the initial question and the answers so far, reframe a new question that will help tease out any unexplored areas.
        """,
        agents = [compere],
        context=dict(fast_thoughts=fast_thoughts, slow_thoughts=slow_thoughts, who_speaks_next=who_speaks_next)
    )
    summary.run()
    print(summary.result)

    shall_we_end = cf.Task(
        "If you believe the conversation has reached its natural conclusion, mark this task complete, otherwise keep the conversation going",
        agents=[compere],
        context=dict(summary=summary),
        result=bool)

    shall_we_end.run()
    print(shall_we_end)

    if shall_we_end.result:
        return slow_thoughts, summary.result

it works, but I'd like for it to loop back and use the follow up question the compere adds at the last step, but it is not clear to me how to do this. I'd also like to avoid the for loops or to make them async or parrallel somehow, since they don't depend on each other.

Proposed Implementation

Maybe the below is already possible :) but how I'd theoretically like it is below.

I'd prefer instead of this for loop:

    for agent in agents_obj:
        fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[agent],
            context=dict(user_question=task_master),
            result_type=FastThoughts
        )
        fast_answer = fast_thoughts.run()
        print(fast_answer)

        if fast_answer.can_answer:
            print(f"Adding {agent.name} to slow thoughts")
            slow_thought_agents.append(agent)

I just add the agents and then have the results in a list:

# this task is done in parrallel
fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[code_executor, web_researcher, database_researcher], # 
            context=dict(user_question=task_master),
            result_type=[FastThoughts] # a list of the output, one per passed in agent
        )

fast_answers = fast_thoughts.run()

for answer in fast_answers:
      if answer.can_answer:
              print(f"Adding {agent.name} to slow thoughts")
              slow_thought_agents.append(agent)

Then I am also unclear how to mark tasks as incomplete or if this is an anti-pattern, but something like

fast_thoughts.set_incomplete()

or perhaps create a new task with subtasks for the next loop?