run-llama / llama_index

LlamaIndex is a data framework for your LLM applications
https://docs.llamaindex.ai
MIT License
36.29k stars 5.18k forks source link

[Bug]: Reusing Context Object Breaks Workflow Event Streaming #16704

Closed brycecf closed 4 hours ago

brycecf commented 5 hours ago

Bug Description

With llama-index-core=0.11.7, I was able to pass an existing Context instance between runs of the same Workflow without issue.

Using llama-index-core=0.11.20, a change to the Context.stream_events breaks streaming events when running the Workflow a second time with an existing context.

Version

0.11.20

Steps to Reproduce

pip install chainlit==1.3.1 llama-index-core==0.11.20

Run the following:

import chainlit as cl
from typing import Any, Optional
from llama_index.core.workflow import (
    Context,
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

class ProgressEvent(Event):
    msg: str

class StepOne(Event):
    msg: Optional[str] = None

class MyWorkflow(Workflow):
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)

    @step(pass_context=True)
    async def initialize(self, ctx: Context, ev: StartEvent) -> StepOne:
        print('Initializing workflow')
        ctx.write_event_to_stream(ProgressEvent(msg="⌛ **Starting...**"))
        if "count" not in ctx.data:
            await ctx.set("count", 0)
            # ctx.data["count"] = 0
        return StepOne()

    @step(pass_context=True)
    async def research(self, ctx: Context, ev: StepOne) -> StopEvent:
        await ctx.set("count", await ctx.get("count")+1)
        ctx.write_event_to_stream(ProgressEvent(msg=f"Count event: {ctx.data['count']}"))
        ctx.write_event_to_stream(ProgressEvent(msg="✅ **Finished.**"))
        return StopEvent(result="Workflow complete.")

@cl.on_chat_start
async def start():
    await cl.Message(
        content="Welcome! Send any message to start the workflow."
    ).send()
    cl.user_session.set("workflow", MyWorkflow(timeout=60))

@cl.on_message
async def main(message: cl.Message):
    msg = cl.Message(content=f"Starting workflow: {message}...")
    await msg.send()

    workflow = cl.user_session.get("workflow", None)
    handler = workflow.run(
        ctx=cl.user_session.get("context", None)
    )

    async for ev in workflow.stream_events():
        if isinstance(ev, ProgressEvent):
            msg.content = msg.content + f"\n{ev.msg}"
            print(ev.msg)
            await msg.update()

    result = await handler

    await cl.Message(
        content=f"{result}"
    ).send()

    cl.user_session.set("context", handler.ctx)

Run the above code using this command:

chainlit run -w <PATH_TO_PYTHON_SCRIPT> --port 8841

Open the resulting chat UI and enter two separate prompts. The first will work and show the streamed events. The second will fail and provide the trace below.

Relevant Logs/Tracbacks

2024-10-25 21:28:58 - async generator raised StopIteration
Traceback (most recent call last):
  File "/Users/me/Library/Caches/pypoetry/virtualenvs/demo-n-PaIroy-py3.11/lib/python3.11/site-packages/llama_index/core/workflow/workflow.py", line 120, in stream_events
    ctx = next(iter(self._contexts))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/me/Library/Caches/pypoetry/virtualenvs/demo-n-PaIroy-py3.11/lib/python3.11/site-packages/chainlit/utils.py", line 44, in wrapper
    return await user_function(**params_values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/me/Library/Caches/pypoetry/virtualenvs/demo-n-PaIroy-py3.11/lib/python3.11/site-packages/chainlit/callbacks.py", line 118, in with_parent_id
    await func(message)
  File "/Users/me/demo/src/buggy_workflow.py", line 88, in main
    async for ev in workflow.stream_events():
RuntimeError: async generator raised StopIteration
dosubot[bot] commented 5 hours ago

Reusing a Context object in llama-index-core=0.11.20 breaks workflow event streaming when running the workflow a second time because the Context object maintains state across runs, including the _streaming_queue and _broker_log. These components are not reset between runs, leading to potential conflicts or unexpected behavior when the workflow is executed again [1].

A similar issue was discussed in a solved thread, where the behavior of streaming events from a nested workflow was addressed [2].

To continue talking to Dosu, mention @dosu.

logan-markewich commented 4 hours ago

Hmm weird. Made a much more simple version to reproduce here https://colab.research.google.com/drive/1O0aZLsLI1eUd2bxwdp-NBcXSHsZFxgIa?usp=sharing

logan-markewich commented 4 hours ago

Oh, I'm silly, I fixed it in the notebook above @brycecf

Its not workflow.stream_events(): its handler.stream_events()