run-llama / llama_deploy

Deploy your agentic worfklows to production
https://docs.llamaindex.ai/en/stable/module_guides/workflow/deployment/
MIT License
1.76k stars 181 forks source link

AttributeError: 'SessionClient' object has no attribute 'run_nowait' #312

Closed 202030481266 closed 1 hour ago

202030481266 commented 3 hours ago
from llama_deploy import LlamaDeployClient, ControlPlaneConfig

client = LlamaDeployClient(ControlPlaneConfig())

session = client.create_session()
# result = session.run("my_workflow", arg1="hello_world")
# print(result)

# kick off run
task_id = session.run_nowait("streaming_workflow", arg1="hello_world")

# stream events -- the will yield a dict representing each event
for event in session.get_task_result_stream(task_id):
    print(event)

# get final result
result = session.get_task_result(task_id)
print(result)

I debug this error and trace the session class. Here is its code implementation:

class SessionClient:
    def __init__(
        self,
        control_plane_config: ControlPlaneConfig,
        session_id: str,
        timeout: float = DEFAULT_TIMEOUT,
        poll_interval: float = DEFAULT_POLL_INTERVAL,
    ):
        # TODO: add scheme to config (http, https, ..)
        self.control_plane_url = control_plane_config.url
        self.session_id = session_id
        self.timeout = timeout
        self.poll_interval = poll_interval

    def run(self, service_name: str, **run_kwargs: Any) -> str:
        """Implements the workflow-based run API for a session."""
        task_input = json.dumps(run_kwargs)
        task_def = TaskDefinition(input=task_input, agent_id=service_name)
        task_id = self.create_task(task_def)

        # wait for task to complete, up to timeout seconds
        start_time = time.time()
        while time.time() - start_time < self.timeout:
            result = self.get_task_result(task_id)
            if isinstance(result, TaskResult):
                return result.result
            time.sleep(self.poll_interval)

        raise TimeoutError(f"Task {task_id} timed out after {self.timeout} seconds")

    def create_task(self, task_def: TaskDefinition) -> str:
        """Create a new task in this session.

        Args:
            task_def (Union[str, TaskDefinition]): The task definition or input string.

        Returns:
            str: The ID of the created task.
        """
        task_def.session_id = self.session_id

        with httpx.Client(timeout=self.timeout) as client:
            response = client.post(
                f"{self.control_plane_url}/sessions/{self.session_id}/tasks",
                json=task_def.model_dump(),
            )
            return response.json()

    def get_tasks(self) -> List[TaskDefinition]:
        """Get all tasks in this session.

        Returns:
            List[TaskDefinition]: A list of task definitions in the session.
        """
        with httpx.Client(timeout=self.timeout) as client:
            response = client.get(
                f"{self.control_plane_url}/sessions/{self.session_id}/tasks"
            )
            return [TaskDefinition(**task) for task in response.json()]

    def get_current_task(self) -> Optional[TaskDefinition]:
        """Get the current (most recent) task in this session.

        Returns:
            Optional[TaskDefinition]: The current task definition, or None if the session has no tasks.
        """
        with httpx.Client(timeout=self.timeout) as client:
            response = client.get(
                f"{self.control_plane_url}/sessions/{self.session_id}/current_task"
            )
            data = response.json()
            return TaskDefinition(**data) if data else None

    def get_task_result(self, task_id: str) -> Optional[TaskResult]:
        """Get the result of a task in this session if it has one.

        Args:
            task_id (str): The ID of the task to get the result for.

        Returns:
            Optional[TaskResult]: The result of the task if it has one, otherwise None.
        """
        with httpx.Client(timeout=self.timeout) as client:
            response = client.get(
                f"{self.control_plane_url}/sessions/{self.session_id}/tasks/{task_id}/result"
            )
            data = response.json()
            return TaskResult(**data) if data else None

So I wonder it is my problem or just the bugs?

logan-markewich commented 3 hours ago

The method definitely exists https://github.com/run-llama/llama_deploy/blob/42d1265499f641ce3f0602185ee228edf8ce9649/llama_deploy/client/sync_client.py#L48

Can you try updating? pip install -U llama-deploy

If you are running in a notebook, make sure you restart it

202030481266 commented 2 hours ago

Thanks for your reply. Running pip install -U llama-deploy can only update to version 0.1.3 in my local machine. And I found that version 0.2.1 need the python 3.11 or greater. I will try to update my python version and see if this could resolve.

202030481266 commented 1 hour ago

Thanks for your reply. Running pip install -U llama-deploy can only update to version 0.1.3 in my local machine. And I found that version 0.2.1 need the python 3.11 or greater. I will try to update my python version and see if this could resolve.

OK, this issue is resovled!