run-llama / llama_deploy

Deploy your agentic worfklows to production
https://docs.llamaindex.ai/en/stable/module_guides/workflow/deployment/
MIT License
1.71k stars 173 forks source link

Detect nested workflows #209

Closed logan-markewich closed 2 weeks ago

logan-markewich commented 2 weeks ago

This PR adds the ability to detect and deploy nested workflows

It works by highjacking the ServiceManager that is inside each workflow. Before injecting the workflow into a step, it optionally checks if that workflow already exists as a service, and then swaps in a new workflow that just makes calls to the remote workflow.

This means users need zero code changes to their existing workflows, they just have to deploy everything, and reap the benefits :)


One concern I had was that the nested workflow always creates (and deletes) a new session. Tbh I wanted to reuse the existing session, but couldn't find a way to insert it. Maybe a future PR :)


Script1 (deploying core services)

from llama_agents import deploy_core, ControlPlaneConfig, SimpleMessageQueueConfig

async def main():
    await deploy_core(
        ControlPlaneConfig(),
        SimpleMessageQueueConfig(),
    )

if __name__ == "__main__":
    import asyncio

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Script2 (creating and deploying workflows)

import asyncio
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step

class InnerWorkflow(Workflow):

    @step()
    async def run_step(self, ev: StartEvent) -> StopEvent:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")

        return StopEvent(result=str(arg1) + "_result")

class OuterWorkflow(Workflow):

    @step()
    async def run_step(self, ev: StartEvent, inner: InnerWorkflow) -> StopEvent:
        arg1 = ev.get("arg1")
        if not arg1:
            raise ValueError("arg1 is required.")

        arg1 = await inner.run(arg1=arg1)

        return StopEvent(result=str(arg1) + "_result")

inner = InnerWorkflow()
outer = OuterWorkflow()
outer.add_workflows(inner=InnerWorkflow())

from llama_agents import deploy_workflow, ControlPlaneConfig, WorkflowServiceConfig

async def main():
    inner_task = asyncio.create_task(
        deploy_workflow(
            inner,
            WorkflowServiceConfig(host="127.0.0.1", port=8003, service_name="inner"),
            ControlPlaneConfig(),
        )
    )

    outer_task = asyncio.create_task(
        deploy_workflow(
            outer,
            WorkflowServiceConfig(host="127.0.0.1", port=8002, service_name="outer"),
            ControlPlaneConfig(),
        )
    )

    await asyncio.gather(inner_task, outer_task)

if __name__ == "__main__":
    import asyncio

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Script3 (using the client)

from llama_agents import LlamaAgentsClient, ControlPlaneConfig

client = LlamaAgentsClient(ControlPlaneConfig())
session = client.create_session()
session.run("outer", arg1="hello_world")
> 'hello_world_result_result'