microsoft / autogen

A programming framework for agentic AI 🤖
https://microsoft.github.io/autogen/
Creative Commons Attribution 4.0 International
34.65k stars 5.01k forks source link

Manual termination triggerable from outside a running chat #4301

Closed yiren-liu closed 15 hours ago

yiren-liu commented 16 hours ago

What feature would you like to be added?

A ManualTermination class that can be controlled and triggered from outside a running chat/team.

# custom termination condition, that allows for external manual termination
class ManualTermination(TerminationCondition):
    def __init__(self):
        self._terminated = False
        self.manual_termination_flag = False

    @property
    def terminated(self) -> bool:
        return self._terminated

    async def __call__(self, messages: Sequence[AgentMessage]) -> StopMessage | None:
        if self._terminated:
            raise TerminatedException("Termination condition has already been reached")
        if self.manual_termination_flag:
            self._terminated = True
            return StopMessage(content="Manual termination", source="ManualTermination")
        return None

    async def reset(self) -> None:
        self._terminated = False
        self.manual_termination_flag = False

    def set_manual_termination(self) -> None:
        self.manual_termination_flag = True

Why is this needed?

This is useful for me when implementing an async override termination that can be applied in a different thread for serving chatting backend applications (e.g., in FastAPI).

Hypothetical Usage:

class AgentGroupChat:
    def __init__(self, agents: List[AssistantAgent], termination_condition: TerminationCondition):
        self.agents = agents
        self.termination_flipper = ManualTermination()
        self.termination_condition = termination_condition | self.termination_flipper
        self.team = RoundRobinGroupChat(agents, termination_condition=self.termination_condition)

    async def run_stream(self, task: str):
        stream = self.team.run_stream(task=task)
        messages = []
        async for message in stream:
            messages.append(message)
            print(message)
        return messages

    def set_manual_termination(self) -> None:
        self.termination_flipper.set_manual_termination()

@baseRouter.get("/chat/testing/run_stream")
async def run_stream(request: Request):
    task = "Tell me a one-liner joke."
    agents = [
        create_agent("assistant1", "You are a helpful assistant."),
        create_agent("assistant2", "You are a helpful assistant.")
    ]

    termination_condition = MaxMessageTermination(10) | TextMentionTermination("TERMINATE")
    agent_group_chat = AgentGroupChat(agents, termination_condition)
    # save the agent_group_chat object to the session data
    set_agent_group_chat(get_session_id(request), agent_group_chat)

    res = await agent_group_chat.run_stream(task)
    return {"messages": res}

@baseRouter.get("/chat/testing/send_manual_termination")
async def send_manual_termination(request: Request):
    agent_group_chat = await get_session_agent_group_chat(request)
    if not agent_group_chat:
        raise HTTPException(status_code=404, detail="Agent group chat not found")
    agent_group_chat.set_manual_termination()
    return {"message": "Manual termination sent"}
victordibia commented 16 hours ago

Good ideas @yiren-liu.

There are potentially two ways to explore the idea of externally terminating an agent run. The first is a ExternalTermination which you reference above, and the second one is the use of a CancellationToken

Manual / External Termination

@ekzhu and I have chatted about this. There is a PR that was just merged today that adds ExternalTermination.


class AgentGroupChat:
    def __init__(self, agents: List[AssistantAgent], termination_condition: TerminationCondition):
        self.agents = agents
        self.termination_flipper = ExternalTermination() # <- changed
        self.termination_condition = termination_condition | self.termination_flipper
        self.team = RoundRobinGroupChat(agents, termination_condition=self.termination_condition) 

---
def set_manual_termination(self) -> None:
        self.termination_flipper.set() # <- changed

Let us know if it fits your use case.

Cancellation token

You can create a cancellation_token and pass it to your team.run() method.

from autogen_core.base import CancellationToken  

cancellation_token = CancellationToken()
stream = self.team.run_stream(task=task, cancellation_token=cancellation_token) 

.....

@baseRouter.get("/chat/testing/send_manual_termination")
async def send_manual_termination(request: Request):
    agent_group_chat = await get_session_agent_group_chat(request)
    if not agent_group_chat:
        raise HTTPException(status_code=404, detail="Agent group chat not found")
    cancellation_token.cancel()
yiren-liu commented 15 hours ago

Thanks a lot! This is exactly what I needed, thank you folks!

victordibia commented 15 hours ago

Excellent!

P.S AutoGen studio update uses the cancellation token to stop runs mid-flight.

https://github.com/user-attachments/assets/dc2b84b0-6302-4f07-b790-e264576bfb1a