run-llama / llama_deploy

Deploy your agentic worfklows to production
https://docs.llamaindex.ai/en/stable/module_guides/llama_deploy/
MIT License
1.86k stars 193 forks source link

Whether Llama Deploy has a JavaScript SDK for interacting with deployed systems ? #380

Closed ifreeman6 closed 2 days ago

ifreeman6 commented 3 days ago

I have the following need: I want to interact with the deployed services from the React frontend. However, I found that currently only the Python SDK supports client interaction, and on the JavaScript side, I can only use the fetch method. Will there be a JavaScript SDK provided in the future to facilitate frontend interactions? Thanks very much.

logan-markewich commented 3 days ago

Currently there is no javascript/typescript SDK. However, that doesn't stop you from using llama-deploy! Feel free to use the api itself exposed by the api server

You can see the swagger docs embedded in the docs page (or you can visit the URL of the apiserver with the /docs endpoint) https://docs.llamaindex.ai/en/stable/module_guides/llama_deploy/20_core_components/#api-server

logan-markewich commented 3 days ago

Maintaining a javascript implementation is a bit out of scope for us currently, but it could happen in the future. I definitely welcome any help on this aspect!

ifreeman6 commented 3 days ago

Hi, logan. I try to different methods, but I still don't know how to show the intermediate stream events progression in my React Frontend, for example:

  1. I write a set of progress events by ctx in my own workflow:
    ctx.write_event_to_stream(Event(
                metadata={
                    "progress": {
                        "status": "completed",
                        "message": "Response generated successfully",
                        "timestamp": datetime.now().isoformat()
                    }
                }
            ))
  2. I have successfully deployed this workflow:

    def build_rag_with_citations_workflow() -> CitationQueryEngineWorkflow:
    print("Loading existing index...")
    current_dir = os.path.dirname(os.path.abspath(__file__))
    persist_dir = os.path.join(current_dir, "data", "vector_persist", "paul_graham", "vector_index")
    storage_context = StorageContext.from_defaults(persist_dir=persist_dir)
    index = load_index_from_storage(storage_context)
    
    return CitationQueryEngineWorkflow(
        index=index,
        timeout=120,
        verbose=True
    )
  3. This is my deployment.yml file
    
    name: MyDeployment

control-plane: port: 8000

default-service: agentic_workflow

default-service: rag_with_citations_workflow

services: rag_with_citations_workflow: name: RAG with Citations Workflow source: type: local name: ./workflows path: workflows:rag_with_citations_w

4. This is my next.js front-end file:
``` tsx
"use client";

import type { ReactNode } from "react";
import {
  AssistantRuntimeProvider,
  TextContentPart,
  useLocalRuntime,
  type ChatModelAdapter,
} from "@assistant-ui/react";
import { useProgressStore } from "./components/WorkflowProgress";

const MyModelAdapter: ChatModelAdapter = {
  async *run({ messages, abortSignal, config }) {

    const progressStore = useProgressStore.getState();
    progressStore.clearSteps();

    // console.log(messages);
    const last_message = messages[messages.length - 1].content[0];
    const last_message_text = (last_message as TextContentPart).text;
    // console.log(last_message_text);

    const response = await fetch(
      `${process.env.APISERVER_URL || 'http://localhost:4501'}/deployments/${process.env.DEPLOYMENT_NAME || 'MyDeployment'}/tasks/run`,
      {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
        },
        body: JSON.stringify({
          // input: JSON.stringify({ user_input: last_message_text }),
          input: JSON.stringify({ query: last_message_text }),
        }),
        signal: abortSignal,
      }
    );
    console.log(response);
    const data = await response.json();
    console.log(data);

    yield {
      content: [{ type: "text", text: data }],
    };
  },
};

export function MyRuntimeProvider({
  children,
}: Readonly<{
  children: ReactNode;
}>) {
  const runtime = useLocalRuntime(MyModelAdapter);

  return (
    <AssistantRuntimeProvider runtime={runtime}>
      {children}
    </AssistantRuntimeProvider>
  );
}

I don't know how to get the ctx stream events from the API end. Could you please provide guidance on how to properly set up and handle streaming events from the API endpoint?

Thanks very much!

ifreeman6 commented 3 days ago

I can print the progression and result conviently by python SDK as follows, but I don't know how to fulfill the same task using Javascript.

import httpx
import asyncio
from datetime import datetime
from typing import Any, Dict
from colorama import init, Fore, Style
from llama_deploy.client import AsyncLlamaDeployClient
from llama_deploy.control_plane import ControlPlaneConfig

init()  # Initialize colorama

class ConsoleFormatter:
    """Format console output"""

    @staticmethod
    def header(text: str) -> None:
        print(f"\n{Fore.CYAN}{'='*50}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{text.center(50)}{Style.RESET_ALL}")
        print(f"{Fore.CYAN}{'='*50}{Style.RESET_ALL}\n")

    @staticmethod
    def section(text: str) -> None:
        print(f"\n{Fore.BLUE}{'-'*50}{Style.RESET_ALL}")
        print(f"{Fore.BLUE}{text}{Style.RESET_ALL}")
        print(f"{Fore.BLUE}{'-'*50}{Style.RESET_ALL}\n")

    @staticmethod
    def progress(progress: Dict[str, Any]) -> None:
        status = progress.get("status", "unknown")
        message = progress.get("message", "")
        timestamp = progress.get("timestamp", datetime.now().isoformat())
        timestamp = datetime.fromisoformat(timestamp).strftime("%H:%M:%S")

        color = {
            "running": Fore.YELLOW,
            "completed": Fore.GREEN,
            "error": Fore.RED
        }.get(status, Fore.WHITE)

        status_str = f"[{status.upper():^8}]"
        print(f"{color}{timestamp} {status_str} {message}{Style.RESET_ALL}")

    @staticmethod
    def result(text: str) -> None:
        print(f"\n{Fore.GREEN}Output:{Style.RESET_ALL}")
        print(f"{text}\n")

    @staticmethod
    def error(text: str) -> None:
        print(f"{Fore.RED}Error: {text}{Style.RESET_ALL}")

async def stream_events():
    console = ConsoleFormatter()
    console.header("RAG with Citations Workflow")

    client = AsyncLlamaDeployClient(
        control_plane_config=ControlPlaneConfig(),
        timeout=120
    )

    try:
        # Create session
        session = await client.create_session()
        console.section(f"Session created (ID: {session.session_id})")

        # Start task
        query = "tell me about YC."
        task_id = await session.run_nowait(
            "rag_with_citations_workflow", 
            query=query
        )
        console.section(f"Task started (ID: {task_id})")
        print(f"Query: {Fore.YELLOW}{query}{Style.RESET_ALL}\n")

        # Process stream events
        max_retries = 3  # Retry count
        retry_delay = 20  # Retry interval
        processed_timestamps = set()  # Track processed events

        for attempt in range(max_retries):
            try:
                async for event in session.get_task_result_stream(task_id):
                    if isinstance(event, dict):
                        event_data = event.get('_data', event)

                        # Process progress events from metadata
                        if isinstance(event_data.get("metadata"), dict):
                            progress = event_data["metadata"].get("progress")
                            if isinstance(progress, dict):
                                # Use timestamp as unique identifier
                                timestamp = progress.get("timestamp")
                                if timestamp and timestamp not in processed_timestamps:
                                    processed_timestamps.add(timestamp)
                                    console.progress(progress)

                        # Process result events
                        result = event_data.get("result")
                        if result is not None:
                            if isinstance(result, str):
                                console.result(result)
                            elif isinstance(result, dict) and "response" in result:
                                console.result(result["response"])
                break  # Exit retry loop after processing all events
            except httpx.ReadTimeout:
                if attempt < max_retries - 1:
                    console.error(f"Stream processing timeout (attempt {attempt + 1}/{max_retries}), waiting {retry_delay} seconds before retrying...")
                    await asyncio.sleep(retry_delay)
                    retry_delay *= 1.5
                else:
                    console.error("Stream processing timeout, continue processing final result...")
            except Exception as e:
                console.error(f"Stream processing error: {str(e)}")
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay)
                    retry_delay *= 1.5

        # Get final result
        # console.section("Getting final result")
        # await asyncio.sleep(5)  # Wait to ensure task completion

        # final_result = await session.get_task_result(task_id)
        # if final_result:
        #     if isinstance(final_result, dict) and "response" in final_result:
        #         console.result(final_result["response"])
        #     else:
        #         console.result(str(final_result))
        # else:
        #     console.error("No final result received")

    except Exception as e:
        console.error(f"Unexpected error: {str(e)}")

    finally:
        try:
            if session:
                await client.delete_session(session.session_id)
                console.section("Session cleaned up")
        except Exception as e:
            console.error(f"Cleanup error: {str(e)}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(stream_events())
logan-markewich commented 3 days ago

The streaming api is here: https://github.com/run-llama/llama_deploy/blob/8c3476e0ad5008d1c0e42b67ab2d5cbd61134681/llama_deploy/client/models/apiserver.py#L84

Should be straightfoward to translate that to javascript (or easy to ask claude to do it 😁 )

ifreeman6 commented 3 days ago

The streaming api is here:

https://github.com/run-llama/llama_deploy/blob/8c3476e0ad5008d1c0e42b67ab2d5cbd61134681/llama_deploy/client/models/apiserver.py#L84

Should be straightfoward to translate that to javascript (or easy to ask claude to do it 😁 )

Thanks for your timely response.

 events_url = f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

I have another issue:

I don't know how to get the task_id (self.id) and session_id timely in the front-end when I send the POST request.

`${process.env.APISERVER_URL || 'http://localhost:4501'}/deployments/${process.env.DEPLOYMENT_NAME || 'MyDeployment'}/tasks/run`, 
f"{self.client.api_server_url}/deployments/{self.deployment_id}/tasks/{self.id}/events"

image

Thanks very much

ifreeman6 commented 3 days ago

I have solved my issue. Thanks very much.