run-llama / llama_deploy

Deploy your agentic worfklows to production
https://docs.llamaindex.ai/en/stable/module_guides/llama_deploy/
MIT License
1.84k stars 191 forks source link

apiserver endpoint error en session.get_task_result_stream(task_id) with client #374

Open jdelacasa opened 22 hours ago

jdelacasa commented 22 hours ago

I'm doing something wrong but I can't figure it out. When I call from my fastapi to try to retrieve the streaming events, the apiserver output gives me a 404 Not found:

NFO: 10.244.2.40:57518 - "POST /sessions/create HTTP/1.1" 200 OK INFO:llama_deploy.message_queues.base - Publishing message of type 'ag-rag-react-stream' with action 'ActionTypes.NEW_TASK' to topic 'llama_deploy.ag-rag-react-stream' INFO: 127.0.0.1:55704 - "POST /publish/llama_deploy.ag-rag-react-stream HTTP/1.1" 200 OK INFO: 10.244.2.40:57524 - "POST /sessions/5cfe06a0-375f-4387-a8fb-036e94957f85/tasks HTTP/1.1" 200 OK INFO: 127.0.0.1:42934 - "POST /process_message HTTP/1.1" 200 OK INFO:llama_deploy.message_queues.simple - Successfully published message 'ag-rag-react-stream' to consumer. INFO: 10.244.2.40:57530 - "GET /sessions/5cfe06a0-375f-4387-a8fb-036e94957f85/tasks/821c981c-55bb-4240-9ada-7065bdd36613/result_stream HTTP/1.1" 404 Not Found INFO: 127.0.0.1:60004 - "GET /sessions/5cfe06a0-375f-4387-a8fb-036e94957f85/state HTTP/1.1" 200 OK

example code (fastapi endpoint)

@workflow_router.post("/workflow/execute") async def workflow_execute(request: Request):

(...)

client = llama_deploy.Client(api_server_url=apiserver_url,
                            control_plane_url=control_plane_url,
                            timeout=300
                            )
session = await client.core.sessions.create()
task_id = await session.run_nowait(service_name, **request_input)

async def stream_events():

    async for event in session.get_task_result_stream(task_id):
        if "progress" in event:
            data = {
                "choices": [
                    {
                        "delta": { "content": f"Chunk {event['progress']}" }
                    }
                ]
            }
            yield f"data: {json.dumps(data)}\n\n"
            logger.info(f"Progress: {event['progress']}")

return StreamingResponse(stream_events(), media_type="application/octet-stream")

============= OUTPUT :

INFO: 10.244.1.184:57970 - "POST /workflow/execute HTTP/1.1" 200 OK ERROR: Exception in ASGI application Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions yield File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 373, in handle_async_request resp = await self._pool.handle_async_request(req) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request raise exc from None File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request response = await connection.handle_async_request( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 101, in handle_async_request return await self._connection.handle_async_request(request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 143, in handle_async_request raise exc File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 113, in handle_async_request ) = await self._receive_response_headers(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 186, in _receive_response_headers event = await self._receive_event(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 224, in _receive_event data = await self._network_stream.read( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 32, in read with map_exceptions(exc_map): File "/usr/local/lib/python3.11/contextlib.py", line 158, in exit self.gen.throw(typ, value, traceback) File "/usr/local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions raise to_exc(exc) from exc httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 401, in run_asgi result = await app( # type: ignore[func-returns-value] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in call return await self.app(scope, receive, send) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in call await super().call(scope, receive, send) File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 113, in call await self.middleware_stack(scope, receive, send) File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 187, in call raise exc File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 165, in call await self.app(scope, receive, _send) File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 62, in call await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send) File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 62, in wrapped_app raise exc File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 51, in wrapped_app await app(scope, receive, sender) File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 715, in call await self.middleware_stack(scope, receive, send) File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 735, in app await route.handle(scope, receive, send) File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 288, in handle await self.app(scope, receive, send) File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 76, in app await wrap_app_handling_exceptions(app, request)(scope, receive, send) File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 62, in wrapped_app raise exc File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 51, in wrapped_app await app(scope, receive, sender) File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 74, in app await response(scope, receive, send) File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in call async with anyio.create_task_group() as task_group: File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 597, in aexit raise exceptions[0] File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 253, in wrap await func() File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 242, in stream_response async for chunk in self.body_iterator: File "/maarifa/maarifa-api/routes/workflow.py", line 436, in stream_events async for event in session.get_task_result_stream(task_id): File "/usr/local/lib/python3.11/site-packages/llama_deploy/client/models/core.py", line 128, in get_task_result_stream async with client.stream("GET", url) as response: File "/usr/local/lib/python3.11/contextlib.py", line 210, in aenter return await anext(self.gen) ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1617, in stream response = await self.send( ^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1661, in send response = await self._send_handling_auth( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1689, in _send_handling_auth response = await self._send_handling_redirects( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects response = await self._send_single_request(request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1763, in _send_single_request response = await transport.handle_async_request(request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 372, in handle_async_request with map_httpcore_exceptions(): File "/usr/local/lib/python3.11/contextlib.py", line 158, in exit self.gen.throw(typ, value, traceback) File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ReadTimeout

logan-markewich commented 22 hours ago

@jdelacasa I think either

  1. the stream was not created yet and the code to get the stream ran too soon? You might need to retry this or have a small delay
  2. your workflow isn't streaming any events
jdelacasa commented 22 hours ago

it is used constantly in the workflow:

ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))

and the final result is given to me correctly when it finishes its execution:

async def stream_events():

    async for event in session.get_task_result_stream(task_id):
        if event is not None:
            #if "progress" in event:
            data = {
                "choices": [
                    {
                        "delta": { "content": f"Chunk {event}" }
                    }
                ]
            }
            yield f"data: {json.dumps(data)}\n\n"
            logger.info(f"Progress: {event['progress']}")

    try:
        while True:
            final_result = await session.get_task_result(task_id)
            if final_result is not None:

                data = {
                    "choices": [
                        {
                            "delta": { "content": f"Final Result: {final_result.result}" }
                        }
                    ]
                }
                yield f"data: {json.dumps(data)}\n\n"
                # # Finalizamos el stream
                data = {
                    "choices": [
                        {
                            "finish_reason": "stop"
                        }
                    ]
                }
                yield f"data: {json.dumps(data)}\n\n"
                logger.info(f"Final Result: {final_result}")
                break
            await asyncio.sleep(0.3)

    except Exception as e:
        logger.error(f"Error in result: {e}")
        yield f"data: {json.dumps({'error': str(e)})}\n\n"

    finally:
        await client.core.sessions.delete(session.id)

return StreamingResponse(stream_events(), media_type="application/octet-stream")
jdelacasa commented 21 hours ago

It seems that if I add a sleep before each ctx.write_event_to_stream the reception of events in stream works:

    await asyncio.sleep(0.3)
    ctx.write_event_to_stream(ProgressEvent(msg="ok"))