run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.1k stars 5.14k forks source link

[Question]: How to Stream Tokens of Workflow Output? #16375

Open brycecf opened 1 week ago

brycecf commented 1 week ago

Question Validation

Question

The existing streaming events functionality and examples address the question of how to provide streaming progress of the workflow itself.

But, given the presence of the StopEvent, which the current examples use to return the Workflow's output, how could I Workflow stream the final node's response (assuming an LLM with streaming support)?

I am looking to recreate the functionality shown here in a LangGraph example.

dosubot[bot] commented 1 week ago

To stream the final node's response in a workflow using an LLM with streaming support, you can use the astream_complete method to generate an iterable of the LLM's response. This allows you to stream each piece of the response as it is generated. Here's a basic implementation:

  1. Use the astream_complete method in your workflow to get a generator for the LLM's response.
  2. Stream each piece of the response by writing it to the event stream using ctx.write_event_to_stream.
  3. Return the final response in the StopEvent.

Here's an example:

class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(Event(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini")
        generator = await llm.astream_complete(
            "Please give me the first 3 paragraphs of Moby Dick, a book in the public domain."
        )
        async for response in generator:
            ctx.write_event_to_stream(Event(msg=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(Event(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

To run the workflow and listen for events:

async def main():
    w = MyWorkflow(timeout=30, verbose=True)
    handler = w.run(first_input="Start the workflow.")

    async for ev in handler.stream_events():
        print(ev.msg)

    final_result = await handler
    print("Final result", final_result)

    draw_all_possible_flows(MyWorkflow, filename="streaming_workflow.html")

if __name__ == "__main__":
    asyncio.run(main())

This setup allows you to stream the LLM's response as it is generated, providing a real-time experience for the user [1].

To continue talking to Dosu, mention @dosu.