run-llama / llama_deploy

Deploy your agentic worfklows to production
https://docs.llamaindex.ai/en/stable/module_guides/workflow/deployment/
MIT License
1.76k stars 181 forks source link

All Events in Result Stream Arrive At Once Instead of When Ready #308

Open eyao27 opened 1 day ago

eyao27 commented 1 day ago

I am using Llama deploy for an application where I want to send a client updates as a workflow is running. I'm trying to do this with the get_task_result_stream function in an AsyncLlamaDeployClient.

In the E2E test, the workflow has Steps 1 through 3, which produce the events with the numbers 0.3, 0.6 and 0.9. I added a "await asyncio.sleep(2)" after each of the events ctx.write_event_to_stream(ProgressEvent(progress=0.3))...etc.

See below.

class StreamingWorkflow(Workflow):
    @step()
    async def run_step_1(self, ctx: Context, ev: StartEvent) -> Step1:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")
        ctx.write_event_to_stream(ProgressEvent(progress=0.3))
        await asyncio.sleep(2)
        return Step1(arg1=str(arg1) + "_result")

    @step()
    async def run_step_2(self, ctx: Context, ev: Step1) -> Step2:
        arg1 = ev.arg1
        if not arg1:
            raise ValueError("arg1 is required.")
         ctx.write_event_to_stream(ProgressEvent(progress=0.6))
        await asyncio.sleep(2)
        return Step2(arg1=str(arg1) + "_result")

    @step()
    async def run_step_3(self, ctx: Context, ev: Step2) -> StopEvent:
        arg1 = ev.arg1
        if not arg1:
            raise ValueError("arg1 is required.")
        ctx.write_event_to_stream(ProgressEvent(progress=0.9))
        await asyncio.sleep(2)
        return StopEvent(result=str(arg1) + "_result")

In test_run_client.py, I modified it so that the event's progress will be printed out. I'm noticing that these three events (0.3, 0.6, 0.9) are arriving at the same time after 6 seconds. The desired behavior is that each event arrives separately after 2 seconds.

  async for event in session.get_task_result_stream(task_id):
      if "progress" in event:
          logger.info(event["progress"])

Any insight into how to solve this would be greatly appreciated!

masci commented 23 hours ago

Hi @eyao27 thanks for reporting!

I was able to reproduce but couldn't find the problem at first glance, let me dig into this and I'll report back!