microsoft / autogen

A programming framework for agentic AI 🤖
https://microsoft.github.io/autogen/
Creative Commons Attribution 4.0 International
31.36k stars 4.57k forks source link

[Issue]: Is there any example of SSE for streaming? #3029

Open yidasanqian opened 3 months ago

yidasanqian commented 3 months ago

Describe the issue

Using FastAPI or Flask?

Steps to reproduce

No response

Screenshots and logs

No response

Additional Information

No response

LittleLittleCloud commented 3 months ago

python/non-python? server-side/client side?

yidasanqian commented 3 months ago

python/non-python? server-side/client side?

Python, server-side

LittleLittleCloud commented 3 months ago

@yidasanqian I currently only have dotnet/server example for SSE endpoint. The gist is to set response header as text/event-stream and return the data with sse-event format

event:
your-event-nae
data:
your-data...

https://github.com/LittleLittleCloud/Agent-ChatRoom/blob/b639c7b5e0781ada4ea5d413d1c840534d2971a9/ChatRoom/ChatRoom.Common/Controller/ChatRoomClientController.cs#L241.

I also find an example for sse server-side implementation using fast-api, which you can find out that the header is also set to text/event-stream and the data format is also data:..... Hope this can help a little bit.

https://medium.com/@nandagopal05/server-sent-events-with-python-fastapi-f1960e0c8e4b

yidasanqian commented 3 months ago

@LittleLittleCloud I have two agents, how can I stream the conversation between them through FastAPI?

assistant = AssistantAgent(
    name="assistant",
    system_message=DEFAULT_SYSTEM_MESSAGE,
    llm_config=llm_config
)

user_agent = UserProxyAgent(
    name="user_agent",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=10,
    is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
    code_execution_config={"executor": code_executor}
)

chat_res = user_agent.initiate_chat(
        recipient=assistant,
        message=message,
        summary_method="last_msg"
      )
pamelafox commented 2 months ago

I do not use autogen yet, but I have personally experimented with SSE and ended up switching over to json-lines with Transfer-encoding: chunked. I have a blog post about that approach here: https://blog.pamelafox.org/2023/08/fetching-json-over-streaming-http.html

I also have sample backends in Quart and FastAPI, using just the OpenAI SDK. Would love to see an example using AutoGen and streaming!

r4881t commented 2 weeks ago

I managed to get something working, but it's not the best implementation

Create a custom class for IO

from autogen.io.base import IOStream
from queue import Queue, Empty

class DebugQueue(Queue):
  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.put_count = 0
    self.get_count = 0

  def put(self, item, block = True, timeout: float | None=None):
    self.put_count += 1
    logger.debug(f"Putting item {self.put_count} into queue")
    super().put(item, block=block, timeout=timeout)

  def get(self, block=True, timeout=None):
    item = super().get(block=block, timeout=timeout)
    self.get_count += 1
    logger.debug(f"Getting item {self.get_count} from queue.")
    return item

class CustomIOSSE(IOStream):
  def __init__(self):
    IOStream.__init__(self)

    self.queue = DebugQueue()
    self.is_done = asyncio.Event()
    self.loop = asyncio.get_event_loop()
    self.print_count = 0
    self.consumer_thread = None
    self.message_available = asyncio.Event()

  @staticmethod
  async def handler(on_connect, *args, **kwargs):
    """The handler function for the websocket server."""
    io_sse = CustomIOSSE()
    with CustomIOSSE.set_default(io_sse):
      # Start the on_connect function in a separate task
      task = asyncio.create_task(on_connect(io_sse, *args, **kwargs))

      io_sse.consumer_thread = Thread(target=io_sse.consume_queue)
      io_sse.consumer_thread.start()

      # Yield messages as they become available
      try:
        while not io_sse.is_done.is_set() or not io_sse.queue.empty():
          logger.debug("Handler loop iteration")
          try:
            await asyncio.wait_for(io_sse.message_available.wait(), timeout=0.1)
            io_sse.message_available.clear()
          except asyncio.TimeoutError:
            yield ": keep-alive\n\n"

          logger.debug("Checking queue for messages")
          while not io_sse.queue.empty():
            try:
              message = io_sse.queue.get_nowait()
              logger.debug(f"Yielding message: {message}")
              yield f"data: {message}\n\n"
              io_sse.queue.task_done()
            except Empty:
              logger.debug("Queue unexpectedly empty")
              break
      finally:
        logger.info("Handler finished, waiting for task to complete")
        await task
        io_sse.is_done.set()
        io_sse.consumer_thread.join()
        logger.info("Task and consumer thread completed")

  def consume_queue(self):
    while not self.is_done.is_set():
      if not self.queue.empty():
        logger.info("Messages available in queue")
        self.loop.call_soon_threadsafe(self.message_available.set)
      asyncio.run(asyncio.sleep(0.1))  # Use asyncio.sleep in thread
    logger.info("Consumer thread finished")

  def print(
    self, *objects: Any, sep: str = " ", end: str = "\n", flush: bool = False
  ) -> None:
    r"""Print data to the output stream.

    Args:
        objects (any): The data to print.
        sep (str, optional): The separator between objects.
        Defaults to " ".
        end (str, optional): The end of the output. Defaults to "\n".
        flush (bool, optional): Whether to flush the output.
        Defaults to False.
    """
    self.print_count += 1
    _xs = sep.join(map(str, objects)) + end
    logger.debug(
      f"Print called {self.print_count} times. Message: {_text_to_send}"
    )
    self.queue.put(_xs)

  async def input(self, prompt: str = "", *, password: bool = False) -> str:
    raise NotImplementedError("input is not supported for SSE")

  def finish(self):
    """Signal that the processing is complete."""
    self.is_done.set()
    self.loop.call_soon_threadsafe(self.message_available.set)

Define a function to do autogen stuff


def chat_sse(query:str):
  # your autogen code here
from fastapi.responses import StreamingResponse

@router.post("/chat")
async def sse_endpoint(
  message: str,
):
  return StreamingResponse(
    CustomIOSSE.handler(
      chat_sse,
      message
    ),
    media_type="text/event-stream",
  )

What it does:

  1. Sends out SSE based events upon calling

Where it lags:

  1. It is not completely sync. i.e. once all the messages to and fro is done, then the events are sent in bulk. This is not the true events experience.

I tried so hard but i am unable to get the complete streaming experience, so putting this code here in case someone can build on top of it.