ajndkr / lanarky

The web framework for building LLM microservices
https://lanarky.ajndkr.com/
MIT License
965 stars 73 forks source link

bug: Expected response header Content-Type to contain 'text/event-stream', got 'text/plain' #186

Open amitjoy opened 5 months ago

amitjoy commented 5 months ago

Scenario

While running following client, httpx throws the exception denoting that Content-Type is 'text/plain' instead of 'text/event-stream'

import json

import click

from lanarky.clients import StreamingClient

@click.command()
@click.option("--input", required=True)
@click.option("--stream", is_flag=True)
def main(input: str, stream: bool):
    client = StreamingClient("http://localhost:8001")
    for event in client.stream_response(
        method="POST",
        path="/chat?session_id=123",
        json={"input": input},
    ):
        print(f"{event.event}: {json.loads(event.data)['token']}", end="", flush=True)

if __name__ == "__main__":
    main()

The following versions and dependencies are used:

Actual result

httpx_sse._exceptions.SSEError: Expected response header Content-Type to contain 'text/event-stream', got 'text/plain'

Expected result

The client should be able to send the streaming request properly

Acceptance criteria

Contribute

amitjoy commented 5 months ago
Traceback (most recent call last):
  File "/Users/amit/telly/telly-backend/backend/test.py", line 22, in <module>
    main()
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/amit/telly/telly-backend/backend/test.py", line 13, in main
    for event in client.stream_response(
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/lanarky/clients.py", line 41, in stream_response
    for sse in event_source.iter_sse():
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/httpx_sse/_api.py", line 28, in iter_sse
    self._check_content_type()
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/httpx_sse/_api.py", line 18, in _check_content_type
    raise SSEError(
httpx_sse._exceptions.SSEError: Expected response header Content-Type to contain 'text/event-stream', got 'text/plain'
amitjoy commented 5 months ago

The router looks as follows:

router = LangchainAPIRouter()

@router.post(path="/chat",
             name="Chat Endpoint",
             description="The main endpoint to send a chat query to the foundation model",
             summary="Endpoint to chat with the bot",
             tags=["chat"])
async def chat(session_agent: SessionAgent = Depends(lambda: di[SessionAgent])) -> ConversationalRetrievalChain:
    chatbot = session_agent.chatbot("123")
    return chatbot.chain
amitjoy commented 5 months ago

it seems, the primary error was suppressed - Attempted to access streaming response content, without having called read().

amitjoy commented 5 months ago

it got generated from httpx

ajndkr commented 5 months ago

hi @amitjoy, thanks for submitting this bug report!

I will take a look into this soon. In the meantime, if you are able to solve the issue in the meantime, please go ahead with a pull request.

amitjoy commented 5 months ago

@ajndkr Thanks a lot for your quick response. I strongly believe that the main issue lies in Attempted to access streaming response content, without having called read(). and we have to figure out what needs to be done there.

ajndkr commented 5 months ago

@amitjoy in the meantime, can you check if the endpoint works via curl or postman? lanarky.clients is a supplementary module. It doesn't affect core functionalities.

amitjoy commented 5 months ago

I just added a breakpoint in the POST endpoint and invoked it from Postman - it gets intercepted perfectly

amitjoy commented 5 months ago

I have the following two POST endpoints - one using Lanarky router and the other one without streaming

@router.post(path="/chat",
             name="Chat Endpoint",
             description="The main endpoint to send a chat query to the foundation model",
             summary="Endpoint to chat with the bot",
             tags=["chat"])
async def chat(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
               session_agent: SessionAgent = Depends(lambda: di[SessionAgent])) -> ConversationalRetrievalChain:
    chatbot = session_agent.chatbot(session_id)
    return chatbot.chain

@router.post(path="/chat-no-stream",
             name="Chat Endpoint",
             description="The backup endpoint to send a chat query to the foundation model",
             summary="Endpoint to chat with the bot",
             tags=["chat"])
async def chat2(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
                question: Annotated[str, Body(title="The input question", min_length=1)],
                session_agent: SessionAgent = Depends(lambda: di[SessionAgent])):
    chatbot = session_agent.chatbot(session_id)
    return chatbot.chain({"question": question})

The one without streaming works perfectly but when I am invoking the streaming endpoint using cURL:

I am encountering the following exception:

2024-03-14 15:09:23.283 | DEBUG    | agent.chatbot.agent:chain:60 - Initializing conversational RAG chain
2024-03-14 15:09:23.285 | INFO     | logging:callHandlers:1706 - 127.0.0.1:56782 - "POST /chat?session_id=123 HTTP/1.1" 500
2024-03-14 15:09:23.285 | ERROR    | logging:callHandlers:1706 - Exception in ASGI application
Traceback (most recent call last):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 2235, in <module>
    main()
    └ <function main at 0x105a65940>
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 2217, in main
    globals = debugger.run(setup['file'], None, None, is_module)
              │        │   │                          └ False
              │        │   └ {'port': 56757, 'vm_type': None, 'client': '127.0.0.1', 'server': False, 'DEBUG_RECORD_SOCKET_READS': False, 'multiproc': Fal...
              │        └ <function PyDB.run at 0x105b2cc20>
              └ <__main__.PyDB object at 0x10410a890>
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 1527, in run
    return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
           │    │     │          │               │            │     │        └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
           │    │     │          │               │            │     └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
           │    │     │          │               │            └ '/Users/amit/telly/telly-backend/backend/main.py'
           │    │     │          │               └ None
           │    │     │          └ ''
           │    │     └ False
           │    └ <function PyDB._exec at 0x105b2ccc0>
           └ <__main__.PyDB object at 0x10410a890>
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 1534, in _exec
    pydev_imports.execfile(file, globals, locals)  # execute the script
    │             │        │     │        └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
    │             │        │     └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
    │             │        └ '/Users/amit/telly/telly-backend/backend/main.py'
    │             └ <function execfile at 0x105133560>
    └ <module '_pydev_bundle.pydev_imports' from '/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_bund...
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
                 │              │              │     └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
                 │              │              └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
                 │              └ '/Users/amit/telly/telly-backend/backend/main.py'
                 └ 'import os\nimport time\nfrom threading import Thread\n\nimport schedule\nimport uvicorn\nfrom fastapi import FastAPI\nfrom f...
  File "/Users/amit/telly/telly-backend/backend/main.py", line 70, in <module>
    run_uvicorn_loguru(
    └ <function run_uvicorn_loguru at 0x106acbd80>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/uvicorn_loguru_integration.py", line 28, in run_uvicorn_loguru
    server.run()
    │      └ <function Server.run at 0x105cc6020>
    └ <uvicorn.server.Server object at 0x11b00a510>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/uvicorn/server.py", line 62, in run
    return asyncio.run(self.serve(sockets=sockets))
           │       │   │    │             └ None
           │       │   │    └ <function Server.serve at 0x105cc60c0>
           │       │   └ <uvicorn.server.Server object at 0x11b00a510>
           │       └ <function run at 0x1059e7d80>
           └ <module 'asyncio' from '/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio...
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           │      │   └ <coroutine object Server.serve at 0x11afc7de0>
           │      └ <function Runner.run at 0x105a11260>
           └ <asyncio.runners.Runner object at 0x11afd5b10>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           │    │     │                  └ <Task pending name='Task-1' coro=<Server.serve() running at /Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site...
           │    │     └ <function BaseEventLoop.run_until_complete at 0x105a0ee80>
           │    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
           └ <asyncio.runners.Runner object at 0x11afd5b10>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 641, in run_until_complete
    self.run_forever()
    │    └ <function BaseEventLoop.run_forever at 0x105a0ede0>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 608, in run_forever
    self._run_once()
    │    └ <function BaseEventLoop._run_once at 0x105a10c20>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 1936, in _run_once
    handle._run()
    │      └ <function Handle._run at 0x1059a5440>
    └ <Handle Task.task_wakeup(<Future finis... 0x11b20dfd0>>)>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
    │    │            │    │           │    └ <member '_args' of 'Handle' objects>
    │    │            │    │           └ <Handle Task.task_wakeup(<Future finis... 0x11b20dfd0>>)>
    │    │            │    └ <member '_callback' of 'Handle' objects>
    │    │            └ <Handle Task.task_wakeup(<Future finis... 0x11b20dfd0>>)>
    │    └ <member '_context' of 'Handle' objects>
    └ <Handle Task.task_wakeup(<Future finis... 0x11b20dfd0>>)>
> File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 408, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
                   └ <uvicorn.middleware.proxy_headers.ProxyHeadersMiddleware object at 0x11b0abb90>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 69, in __call__
    return await self.app(scope, receive, send)
                 │    │   │      │        └ <bound method RequestResponseCycle.send of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
                 │    │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
                 │    │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
                 │    └ <fastapi.applications.FastAPI object at 0x11af6b410>
                 └ <uvicorn.middleware.proxy_headers.ProxyHeadersMiddleware object at 0x11b0abb90>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
                           │      │        └ <bound method RequestResponseCycle.send of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
                           │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
                           └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
          │    │                │      │        └ <bound method RequestResponseCycle.send of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │    │                │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │    │                └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <starlette.middleware.errors.ServerErrorMiddleware object at 0x11b1aa150>
          └ <fastapi.applications.FastAPI object at 0x11af6b410>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
          │    │   │      │        └ <function ServerErrorMiddleware.__call__.<locals>._send at 0x11b3faca0>
          │    │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │    │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <starlette.middleware.cors.CORSMiddleware object at 0x11b1a9a90>
          └ <starlette.middleware.errors.ServerErrorMiddleware object at 0x11b1aa150>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/middleware/cors.py", line 85, in __call__
    await self.app(scope, receive, send)
          │    │   │      │        └ <function ServerErrorMiddleware.__call__.<locals>._send at 0x11b3faca0>
          │    │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │    │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <starlette.middleware.exceptions.ExceptionMiddleware object at 0x11b1a9890>
          └ <starlette.middleware.cors.CORSMiddleware object at 0x11b1a9a90>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
          │                            │    │    │     │      │        └ <function ServerErrorMiddleware.__call__.<locals>._send at 0x11b3faca0>
          │                            │    │    │     │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │                            │    │    │     └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │                            │    │    └ <starlette.requests.Request object at 0x11b423390>
          │                            │    └ <fastapi.routing.APIRouter object at 0x106ba1c50>
          │                            └ <starlette.middleware.exceptions.ExceptionMiddleware object at 0x11b1a9890>
          └ <function wrap_app_handling_exceptions at 0x106a079c0>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
          │   │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11b3fa520>
          │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          └ <fastapi.routing.APIRouter object at 0x106ba1c50>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
    await self.middleware_stack(scope, receive, send)
          │    │                │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11b3fa520>
          │    │                │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │    │                └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <bound method Router.app of <fastapi.routing.APIRouter object at 0x106ba1c50>>
          └ <fastapi.routing.APIRouter object at 0x106ba1c50>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
    await route.handle(scope, receive, send)
          │     │      │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11b3fa520>
          │     │      │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │     │      └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │     └ <function Route.handle at 0x106a214e0>
          └ LangchainAPIRoute(path='/chat', name='Chat Endpoint', methods=['POST'])
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
    await self.app(scope, receive, send)
          │    │   │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11b3fa520>
          │    │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │    │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <function request_response.<locals>.app at 0x11afc8180>
          └ LangchainAPIRoute(path='/chat', name='Chat Endpoint', methods=['POST'])
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
          │                            │    │        │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11b3fa520>
          │                            │    │        │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │                            │    │        └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │                            │    └ <starlette.requests.Request object at 0x11b40ad50>
          │                            └ <function request_response.<locals>.app.<locals>.app at 0x11b3fa340>
          └ <function wrap_app_handling_exceptions at 0x106a079c0>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
          │   │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11b444fe0>
          │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11b423bd0>>
          │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          └ <function request_response.<locals>.app.<locals>.app at 0x11b3fa340>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 72, in app
    response = await func(request)
                     │    └ <starlette.requests.Request object at 0x11b40ad50>
                     └ <function get_request_handler.<locals>.app at 0x11afc8220>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 296, in app
    content = await serialize_response(
                    └ <function serialize_response at 0x106a07060>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 145, in serialize_response
    value, errors_ = field.validate(response_content, {}, loc=("response",))
                     │     │        └ ConversationalRetrievalChain(memory=ConversationSummaryBufferMemory(llm=ChatVertexAI(verbose=True, client=<vertexai.language_...
                     │     └ <function ModelField.validate at 0x1062179c0>
                     └ ModelField(field_info=FieldInfo(annotation=ConversationalRetrievalChain, required=True), name='Response_Chat_Endpoint_chat_po...
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/fastapi/_compat.py", line 125, in validate
    self._type_adapter.validate_python(value, from_attributes=True),
    │    │             │               └ ConversationalRetrievalChain(memory=ConversationSummaryBufferMemory(llm=ChatVertexAI(verbose=True, client=<vertexai.language_...
    │    │             └ <function TypeAdapter.validate_python at 0x106217240>
    │    └ <pydantic.type_adapter.TypeAdapter object at 0x11afbe510>
    └ ModelField(field_info=FieldInfo(annotation=ConversationalRetrievalChain, required=True), name='Response_Chat_Endpoint_chat_po...
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/pydantic/type_adapter.py", line 258, in validate_python
    return self.validator.validate_python(__object, strict=strict, from_attributes=from_attributes, context=context)
           │    │         │                                │                       │                        └ None
           │    │         │                                │                       └ True
           │    │         │                                └ None
           │    │         └ <method 'validate_python' of 'pydantic_core._pydantic_core.SchemaValidator' objects>
           │    └ SchemaValidator(title="function-plain[validate()]", validator=FunctionPlain(
           │          FunctionPlainValidator {
           │              func: Py(
           │       ...
           └ <pydantic.type_adapter.TypeAdapter object at 0x11afbe510>
TypeError: BaseModel.validate() takes 2 positional arguments but 3 were given
amitjoy commented 5 months ago

I updated the streaming endpoint to the following:

@router.post(path="/chat",
             name="Chat Endpoint",
             description="The main endpoint to send a chat query to the foundation model",
             summary="Endpoint to chat with the bot",
             tags=["chat"])
async def chat(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
               question: Annotated[str, Body(title="The input question", min_length=1)],
               session_agent: SessionAgent = Depends(lambda: di[SessionAgent])) -> ConversationalRetrievalChain:
    chatbot = session_agent.chatbot(session_id)
    return chatbot.chain({"question": question})

When I executed - curl -X POST -N "http://0.0.0.0:8001/chat?session_id=123" -d "Tell me about Remote OSGi Management via MQTT", the chain has been invoked but resulted in exception from pydantic while parsing the LLM result:

2024-03-14 15:23:26.513 | ERROR    | logging:callHandlers:1706 - Exception in ASGI application
Traceback (most recent call last):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 2235, in <module>
    main()
    └ <function main at 0x107c5d940>
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 2217, in main
    globals = debugger.run(setup['file'], None, None, is_module)
              │        │   │                          └ False
              │        │   └ {'port': 56915, 'vm_type': None, 'client': '127.0.0.1', 'server': False, 'DEBUG_RECORD_SOCKET_READS': False, 'multiproc': Fal...
              │        └ <function PyDB.run at 0x107d24c20>
              └ <__main__.PyDB object at 0x106326890>
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 1527, in run
    return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
           │    │     │          │               │            │     │        └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
           │    │     │          │               │            │     └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
           │    │     │          │               │            └ '/Users/amit/telly/telly-backend/backend/main.py'
           │    │     │          │               └ None
           │    │     │          └ ''
           │    │     └ False
           │    └ <function PyDB._exec at 0x107d24cc0>
           └ <__main__.PyDB object at 0x106326890>
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 1534, in _exec
    pydev_imports.execfile(file, globals, locals)  # execute the script
    │             │        │     │        └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
    │             │        │     └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
    │             │        └ '/Users/amit/telly/telly-backend/backend/main.py'
    │             └ <function execfile at 0x10732b560>
    └ <module '_pydev_bundle.pydev_imports' from '/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_bund...
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
                 │              │              │     └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
                 │              │              └ {'__name__': '__main__', '__doc__': None, '__package__': '', '__loader__': <_frozen_importlib_external.SourceFileLoader objec...
                 │              └ '/Users/amit/telly/telly-backend/backend/main.py'
                 └ 'import os\nimport time\nfrom threading import Thread\n\nimport schedule\nimport uvicorn\nfrom fastapi import FastAPI\nfrom f...
  File "/Users/amit/telly/telly-backend/backend/main.py", line 70, in <module>
    run_uvicorn_loguru(
    └ <function run_uvicorn_loguru at 0x108cc3d80>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/uvicorn_loguru_integration.py", line 28, in run_uvicorn_loguru
    server.run()
    │      └ <function Server.run at 0x107ebe020>
    └ <uvicorn.server.Server object at 0x11d241f90>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/uvicorn/server.py", line 62, in run
    return asyncio.run(self.serve(sockets=sockets))
           │       │   │    │             └ None
           │       │   │    └ <function Server.serve at 0x107ebe0c0>
           │       │   └ <uvicorn.server.Server object at 0x11d241f90>
           │       └ <function run at 0x107bdfd80>
           └ <module 'asyncio' from '/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio...
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           │      │   └ <coroutine object Server.serve at 0x11d1f3ef0>
           │      └ <function Runner.run at 0x107c09260>
           └ <asyncio.runners.Runner object at 0x11d06c910>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           │    │     │                  └ <Task pending name='Task-1' coro=<Server.serve() running at /Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site...
           │    │     └ <function BaseEventLoop.run_until_complete at 0x107c06e80>
           │    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
           └ <asyncio.runners.Runner object at 0x11d06c910>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 641, in run_until_complete
    self.run_forever()
    │    └ <function BaseEventLoop.run_forever at 0x107c06de0>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 608, in run_forever
    self._run_once()
    │    └ <function BaseEventLoop._run_once at 0x107c08c20>
    └ <_UnixSelectorEventLoop running=True closed=False debug=False>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 1936, in _run_once
    handle._run()
    │      └ <function Handle._run at 0x107b9d440>
    └ <Handle Task.task_wakeup(<Future finis... 0x11d43ae50>>)>
  File "/usr/local/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
    │    │            │    │           │    └ <member '_args' of 'Handle' objects>
    │    │            │    │           └ <Handle Task.task_wakeup(<Future finis... 0x11d43ae50>>)>
    │    │            │    └ <member '_callback' of 'Handle' objects>
    │    │            └ <Handle Task.task_wakeup(<Future finis... 0x11d43ae50>>)>
    │    └ <member '_context' of 'Handle' objects>
    └ <Handle Task.task_wakeup(<Future finis... 0x11d43ae50>>)>
> File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 408, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
                   └ <uvicorn.middleware.proxy_headers.ProxyHeadersMiddleware object at 0x11d2e0350>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 69, in __call__
    return await self.app(scope, receive, send)
                 │    │   │      │        └ <bound method RequestResponseCycle.send of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
                 │    │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
                 │    │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
                 │    └ <fastapi.applications.FastAPI object at 0x1089eb450>
                 └ <uvicorn.middleware.proxy_headers.ProxyHeadersMiddleware object at 0x11d2e0350>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
                           │      │        └ <bound method RequestResponseCycle.send of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
                           │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
                           └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
          │    │                │      │        └ <bound method RequestResponseCycle.send of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │    │                │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │    │                └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <starlette.middleware.errors.ServerErrorMiddleware object at 0x11d06e790>
          └ <fastapi.applications.FastAPI object at 0x1089eb450>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
          │    │   │      │        └ <function ServerErrorMiddleware.__call__.<locals>._send at 0x11d858220>
          │    │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │    │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <starlette.middleware.cors.CORSMiddleware object at 0x11d3da110>
          └ <starlette.middleware.errors.ServerErrorMiddleware object at 0x11d06e790>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/middleware/cors.py", line 85, in __call__
    await self.app(scope, receive, send)
          │    │   │      │        └ <function ServerErrorMiddleware.__call__.<locals>._send at 0x11d858220>
          │    │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │    │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <starlette.middleware.exceptions.ExceptionMiddleware object at 0x11d3d9f90>
          └ <starlette.middleware.cors.CORSMiddleware object at 0x11d3da110>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
          │                            │    │    │     │      │        └ <function ServerErrorMiddleware.__call__.<locals>._send at 0x11d858220>
          │                            │    │    │     │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │                            │    │    │     └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │                            │    │    └ <starlette.requests.Request object at 0x11d7db2d0>
          │                            │    └ <fastapi.routing.APIRouter object at 0x11d1ce990>
          │                            └ <starlette.middleware.exceptions.ExceptionMiddleware object at 0x11d3d9f90>
          └ <function wrap_app_handling_exceptions at 0x108bfb9c0>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
          │   │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11d8582c0>
          │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          └ <fastapi.routing.APIRouter object at 0x11d1ce990>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
    await self.middleware_stack(scope, receive, send)
          │    │                │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11d8582c0>
          │    │                │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │    │                └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <bound method Router.app of <fastapi.routing.APIRouter object at 0x11d1ce990>>
          └ <fastapi.routing.APIRouter object at 0x11d1ce990>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
    await route.handle(scope, receive, send)
          │     │      │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11d8582c0>
          │     │      │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │     │      └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │     └ <function Route.handle at 0x108c194e0>
          └ LangchainAPIRoute(path='/chat', name='Chat Endpoint', methods=['POST'])
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
    await self.app(scope, receive, send)
          │    │   │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11d8582c0>
          │    │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │    │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │    └ <function request_response.<locals>.app at 0x11d187c40>
          └ LangchainAPIRoute(path='/chat', name='Chat Endpoint', methods=['POST'])
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
          │                            │    │        │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11d8582c0>
          │                            │    │        │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │                            │    │        └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          │                            │    └ <starlette.requests.Request object at 0x11d6a0b90>
          │                            └ <function request_response.<locals>.app.<locals>.app at 0x11d858360>
          └ <function wrap_app_handling_exceptions at 0x108bfb9c0>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
          │   │      │        └ <function wrap_app_handling_exceptions.<locals>.wrapped_app.<locals>.sender at 0x11d8584a0>
          │   │      └ <bound method RequestResponseCycle.receive of <uvicorn.protocols.http.h11_impl.RequestResponseCycle object at 0x11d7dacd0>>
          │   └ {'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.4'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8001), 'cl...
          └ <function request_response.<locals>.app.<locals>.app at 0x11d858360>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/starlette/routing.py", line 72, in app
    response = await func(request)
                     │    └ <starlette.requests.Request object at 0x11d6a0b90>
                     └ <function get_request_handler.<locals>.app at 0x11d186340>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 296, in app
    content = await serialize_response(
                    └ <function serialize_response at 0x108bfb060>
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 145, in serialize_response
    value, errors_ = field.validate(response_content, {}, loc=("response",))
                     │     │        └ {'question': 'Tell me about Remote OSGi Management via MQTT', 'chat_history': 'Human: Tell me about Remote OSGi Management vi...
                     │     └ <function ModelField.validate at 0x10831b9c0>
                     └ ModelField(field_info=FieldInfo(annotation=ConversationalRetrievalChain, required=True), name='Response_Chat_Endpoint_chat_po...
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/fastapi/_compat.py", line 125, in validate
    self._type_adapter.validate_python(value, from_attributes=True),
    │    │             │               └ {'question': 'Tell me about Remote OSGi Management via MQTT', 'chat_history': 'Human: Tell me about Remote OSGi Management vi...
    │    │             └ <function TypeAdapter.validate_python at 0x10831b240>
    │    └ <pydantic.type_adapter.TypeAdapter object at 0x11d1eac50>
    └ ModelField(field_info=FieldInfo(annotation=ConversationalRetrievalChain, required=True), name='Response_Chat_Endpoint_chat_po...
  File "/Users/amit/telly/telly-backend/backend/.venv/lib/python3.11/site-packages/pydantic/type_adapter.py", line 258, in validate_python
    return self.validator.validate_python(__object, strict=strict, from_attributes=from_attributes, context=context)
           │    │         │                                │                       │                        └ None
           │    │         │                                │                       └ True
           │    │         │                                └ None
           │    │         └ <method 'validate_python' of 'pydantic_core._pydantic_core.SchemaValidator' objects>
           │    └ SchemaValidator(title="function-plain[validate()]", validator=FunctionPlain(
           │          FunctionPlainValidator {
           │              func: Py(
           │       ...
           └ <pydantic.type_adapter.TypeAdapter object at 0x11d1eac50>
TypeError: BaseModel.validate() takes 2 positional arguments but 3 were given
ajndkr commented 5 months ago

before implementing the api logic, one important thing to ensure is that your langchain implementation works for streaming. You can test this via StreamingStdOutCallbackHandler callback handler. To stream output, you need to pass stream=True when defining the chain.

Since this chain implementation appears to be complex, I recommend using the low-level StreamingResponse class. You can find the docs and example here: https://lanarky.ajndkr.com/learn/adapters/langchain/fastapi/

amitjoy commented 5 months ago

I have now only one endpoint -

@router.post(path="/ask",
             name="Chat Endpoint",
             description="The main endpoint to ask a question to the foundation model",
             summary="Endpoint to ask question",
             tags=["chat", "ask", "chat"])
async def ask(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
              question: Annotated[str, Body(title="The input question", min_length=1)],
              session_agent: SessionAgent = Depends(lambda: di[SessionAgent])) -> ConversationalRetrievalChain:
    chatbot = session_agent.chatbot(session_id)
    return chatbot.chain({"question": question})

whenever I am getting rid of the return type ConversationalRetrievalChain, it works as if streaming is disabled, and I receive the output from LLM. It encounters the aforementioned error whenever pydantic detects the return type to be ConversationalRetrievalChain. Yes, the chain has been configured to use streaming.

self._model = ChatVertexAI(credentials=gcp_auth.credentials,
                                   model_name=settings.vertex.model.name,
                                   verbose=settings.vertex.model.verbose,
                                   streaming=settings.vertex.model.streaming,
                                   temperature=settings.vertex.model.temperature,
                                   max_output_tokens=settings.vertex.model.max_output_tokens)

  def chain(self) -> BaseConversationalRetrievalChain:
        logger.debug("Initializing conversational RAG chain")
        return ConversationalRetrievalChain.from_llm(llm=self.vertex.model,
                                                     memory=self.memory,
                                                     get_chat_history=lambda h: h,
                                                     return_source_documents=True,
                                                     return_generated_question=True,
                                                     retriever=self.kb_agent.retriever,
                                                     verbose=self.settings.vertex.rag.verbose,
                                                     chain_type=self.settings.vertex.rag.type,
                                                     combine_docs_chain_kwargs={"prompt": self.prompt},
                                                     max_tokens_limit=self.settings.vertex.rag.max_token_limit)

and the streming is set to true. I am using Vertex AI model - chat-bison which as far as I know supports streaming too. I will test with FastAPIs StreamingResponse to validate my assumption though. Will keep you updated.

ajndkr commented 5 months ago

I see! something to note, ChatVertexAI doesn't support async streaming. Not sure if langchain fixed it recently. as lanarky hits the async methods, there's a good chance streaming fails due to this. let me know how it goes!

amitjoy commented 5 months ago

I finally came up with the following endpoint:

@router.post(path="/ask",
             name="Chat Endpoint",
             description="The main endpoint to ask a question to the foundation model",
             summary="Endpoint to ask question",
             tags=["chat", "ask", "chat"])
async def ask(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
              question: Annotated[str, Body(title="The input question", min_length=1)],
              session_agent: SessionAgent = Depends(lambda: di[SessionAgent])):
    chatbot = session_agent.chatbot(session_id)
    return StreamingResponse(chain=chatbot.chain,
                             config={
                                 "inputs": {"question": question},
                                 "callbacks": [TokenStreamingCallbackHandler(output_key="text")]
                             })

whenever I invoke cURL to consume the SSE:

I receive the question and the answer in two different events (as shown below)

➜  ~ curl --http2 -X POST -N "http://0.0.0.0:8001/ask?session_id=123" -d "Tell me about Remote OSGi Management via MQTT" -H "Accept:text/event-stream"
event: completion
data: {"token":" What is Remote OSGi Management via MQTT?"}

event: completion
data: {"token":" Remote OSGi Management via MQTT allows for the remote management of certain aspects of an OSGi container through the MQTT protocol. This includes remote deployment of application bundles, remote start and stop of services, and remote read and update of service configurations."}

I still don't get every token from the main answer though. I am wondering if it is actually supported. According to langchain docs, VertexAI support streaming but not async streaming.

If it is not supported, I can simply fallback to returning the complete JSON in the API response, instead of using SSE. wdyt?

ajndkr commented 5 months ago

@amitjoy can you try once with the sync_mode feature? Here, we execute the __call__ method of chain inside an asyncio loop. I expect this to workaround the async streaming issue of vertexai. example code:

@app.post("/chat")
async def chat(request: ChatInput, chain: ConversationChain = Depends(chain_factory)):
    return StreamingResponse(
        chain=chain,
        config={
            "inputs": request.model_dump(),
            "callbacks": [
                TokenStreamingCallbackHandler(output_key=chain.output_key),
            ],
        },
        run_mode="sync",
    )

code reference: https://github.com/ajndkr/lanarky/blob/64215d95f474e05484888be616b5c071fa31f298/lanarky/adapters/langchain/responses.py#L79-L85

amitjoy commented 5 months ago

@ajndkr I tried the following:

Scenario 1 (run_mode="sync"):

from typing import Annotated, Any

from fastapi import Depends, Query, Body
from kink import di
from lanarky.adapters.langchain.callbacks import TokenStreamingCallbackHandler, SourceDocumentsStreamingCallbackHandler, \
    get_token_data
from lanarky.adapters.langchain.responses import StreamingResponse
from lanarky.adapters.langchain.routing import LangchainAPIRouter
from lanarky.events import Events

from agent.session.agent import SessionAgent

router = LangchainAPIRouter()

@router.post(path="/ask",
             name="Chat Endpoint",
             description="The main endpoint to ask a question to the foundation model",
             summary="Endpoint to ask question",
             tags=["chat", "ask", "chatbot"])
async def ask(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
              question: Annotated[str, Body(title="The input question", min_length=1)],
              session_agent: SessionAgent = Depends(lambda: di[SessionAgent])):
    chatbot = session_agent.chatbot(session_id)
    return StreamingResponse(chain=chatbot.chain,
                             config={
                                 "inputs": {"question": question},
                                 "callbacks": [VertexStreamingCallbackHandler(output_key="text"),
                                               SourceDocumentsStreamingCallbackHandler()]
                             },
                             run_mode="sync")

class VertexStreamingCallbackHandler(TokenStreamingCallbackHandler):

    async def on_chain_end(
            self, outputs: dict[str, Any], **kwargs: dict[str, Any]) -> None:
        if self.llm_cache_used or not self.streaming:
            if self.output_key in outputs:
                message = self._construct_message(
                    data=get_token_data(outputs[self.output_key], self.mode),
                    event=Events.COMPLETION)
                await self.send(message)

Output:

Screenshot 2024-03-19 at 12 59 09

Warnings:

[13:00:57] WARNING Error in callback coroutine: manager.py:323 NotImplementedError('VertexStreamingCallbackH
andler does not implement
on_chat_model_start')
WARNING Error in callback coroutine: manager.py:323 NotImplementedError('SourceDocumentsStreaming
CallbackHandler does not implement
on_chat_model_start')

Scenario 2 (run_mode=async):

from typing import Annotated, Any

from fastapi import Depends, Query, Body
from kink import di
from lanarky.adapters.langchain.callbacks import TokenStreamingCallbackHandler, SourceDocumentsStreamingCallbackHandler, \
    get_token_data
from lanarky.adapters.langchain.responses import StreamingResponse
from lanarky.adapters.langchain.routing import LangchainAPIRouter
from lanarky.events import Events

from agent.session.agent import SessionAgent

router = LangchainAPIRouter()

@router.post(path="/ask",
             name="Chat Endpoint",
             description="The main endpoint to ask a question to the foundation model",
             summary="Endpoint to ask question",
             tags=["chat", "ask", "chatbot"])
async def ask(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
              question: Annotated[str, Body(title="The input question", min_length=1)],
              session_agent: SessionAgent = Depends(lambda: di[SessionAgent])):
    chatbot = session_agent.chatbot(session_id)
    return StreamingResponse(chain=chatbot.chain,
                             config={
                                 "inputs": {"question": question},
                                 "callbacks": [VertexStreamingCallbackHandler(output_key="text"),
                                               SourceDocumentsStreamingCallbackHandler()]
                             })

class VertexStreamingCallbackHandler(TokenStreamingCallbackHandler):

    async def on_chain_end(
            self, outputs: dict[str, Any], **kwargs: dict[str, Any]) -> None:
        if self.llm_cache_used or not self.streaming:
            if self.output_key in outputs:
                message = self._construct_message(
                    data=get_token_data(outputs[self.output_key], self.mode),
                    event=Events.COMPLETION)
                await self.send(message)

Output:

Screenshot 2024-03-19 at 13 00 10

Observation:

While running the chain in SYNC mode, the answer is now generated in tokens, however, due to warnings that the custom VertexStreamingCallbackHandler and SourceDocumentsStreamingCallbackHandler don't implement the necessary methods, the event-stream didn't finish. That's why, you can see in the output that I had to kill the process manually using CTRL+C. However, while running in ASYNC mode, the event-stream succeeded properly after returning the source documents.

ajndkr commented 5 months ago

While running the chain in SYNC mode, the answer is now generated in tokens, however, due to warnings that the custom VertexStreamingCallbackHandler and SourceDocumentsStreamingCallbackHandler don't implement the necessary methods, the event-stream didn't finish.

I see! this is something I can look into. It will take some time from my end but I'll try to resolve this issue by end of this week.

amitjoy commented 5 months ago

@ajndkr Thanks a lot for your continuous assistance! Looking forward to your further analysis 👍

amitjoy commented 5 months ago

@ajndkr Any update on this?

ajndkr commented 5 months ago

@amitjoy hi! i've published a new version: pip install lanarky==0.8.6

here's the demo app I built using ChatVertexAI and it appears to solve the above issue:

from fastapi import Depends
from langchain.chains.conversation.base import ConversationChain
from langchain_community.chat_models.vertexai import ChatVertexAI
from pydantic import BaseModel

from lanarky import Lanarky
from lanarky.adapters.langchain.callbacks import TokenStreamingCallbackHandler
from lanarky.adapters.langchain.responses import StreamingResponse

app = Lanarky()

class ChatInput(BaseModel):
    input: str

def chain_factory() -> ConversationChain:
    return ConversationChain(
        llm=ChatVertexAI(streaming=True),
        verbose=True,
    )

@app.post("/chat")
async def chat(request: ChatInput, chain: ConversationChain = Depends(chain_factory)):
    return StreamingResponse(
        chain=chain,
        config={
            "inputs": request.model_dump(),
            "callbacks": [
                TokenStreamingCallbackHandler(output_key=chain.output_key),
            ],
        },
        run_mode="sync",
    )

if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app)

lmk if this solves the issue on your end.

amitjoy commented 5 months ago

I tried two different versions of output key:

1. with text

from typing import Annotated

from fastapi import Depends, Query, Body
from kink import di
from lanarky.adapters.langchain.callbacks import TokenStreamingCallbackHandler
from lanarky.adapters.langchain.responses import StreamingResponse, ChainRunMode
from lanarky.adapters.langchain.routing import LangchainAPIRouter

from agent.session.agent import SessionAgent

router = LangchainAPIRouter()

@router.post(path="/ask",
             name="Chat Endpoint",
             description="The main endpoint to ask a question to the foundation model",
             summary="Endpoint to ask question",
             tags=["chat", "ask", "chatbot"])
async def ask(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
              question: Annotated[str, Body(title="The input question", min_length=1)],
              session_agent: SessionAgent = Depends(lambda: di[SessionAgent])):
    chatbot = session_agent.chatbot(session_id)
    return StreamingResponse(chain=chatbot.chain,
                             run_mode=ChainRunMode.SYNC,
                             config={
                                 "inputs": {"question": question},
                                 "callbacks": [
                                     TokenStreamingCallbackHandler(output_key="text")
                                 ]
                             })

In this version, the chain doesn't end at all and that's why, the client keeps on sending ping request continuously even thoug the answer got generated.

Output:

POST http://0.0.0.0:8001/ask?session_id=TestSession

HTTP/1.1 200 OK
date: Sat, 23 Mar 2024 23:49:16 GMT
server: uvicorn
cache-control: no-cache
connection: keep-alive
x-accel-buffering: no
content-type: text/event-stream; charset=utf-8
Transfer-Encoding: chunked

Response code: 200 (OK); Time: 13ms (13 ms)

event: completion
data: {"token":" Bndtools is an Eclipse plugin that provides an integrated development environment (IDE) for developing OSGi"}

event: completion
data: {"token":" bundles. It includes a graphical user interface (GUI) for creating, editing, and"}

event: completion
data: {"token":" debugging bundles, as well as a command-line interface (CLI) for"}

event: completion
data: {"token":" automating tasks. Bndtools also includes a number of features that make it easier to develop OS"}

event: completion
data: {"token":"Gi applications, such as a dependency management tool, a build system, and a"}

event: completion
data: {"token":" testing framework."}

: ping - 2024-03-23 23:49:32.158441

2. with answer

from typing import Annotated

from fastapi import Depends, Query, Body
from kink import di
from lanarky.adapters.langchain.callbacks import TokenStreamingCallbackHandler
from lanarky.adapters.langchain.responses import StreamingResponse, ChainRunMode
from lanarky.adapters.langchain.routing import LangchainAPIRouter

from agent.session.agent import SessionAgent

router = LangchainAPIRouter()

@router.post(path="/ask",
             name="Chat Endpoint",
             description="The main endpoint to ask a question to the foundation model",
             summary="Endpoint to ask question",
             tags=["chat", "ask", "chatbot"])
async def ask(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
              question: Annotated[str, Body(title="The input question", min_length=1)],
              session_agent: SessionAgent = Depends(lambda: di[SessionAgent])):
    chatbot = session_agent.chatbot(session_id)
    return StreamingResponse(chain=chatbot.chain,
                             run_mode=ChainRunMode.SYNC,
                             config={
                                 "inputs": {"question": question},
                                 "callbacks": [
                                     TokenStreamingCallbackHandler(output_key=chatbot.chain.output_key)
                                 ]
                             })

similar behaviour - chain didn't finish at all :(

Output:

POST http://0.0.0.0:8001/ask?session_id=TestSession

HTTP/1.1 200 OK
date: Sat, 23 Mar 2024 23:52:00 GMT
server: uvicorn
cache-control: no-cache
connection: keep-alive
x-accel-buffering: no
content-type: text/event-stream; charset=utf-8
Transfer-Encoding: chunked

Response code: 200 (OK); Time: 19ms (19 ms)

event: completion
data: {"token":" Bndtools is an Eclipse plugin that provides support for developing OSGi bundles. It"}

event: completion
data: {"token":" includes a workspace model, a build system, and a debugger. Bndtools can be used to create"}

event: completion
data: {"token":", edit, and debug OSGi bundles, as well as to manage"}

event: completion
data: {"token":" OSGi frameworks."}

event: completion
data: {"token":""}

: ping - 2024-03-23 23:52:16.859272
amitjoy commented 5 months ago

Also note that, if I run the above-mentioned codes in ASYNC mode (ChainRunMode.ASYNC), the chain gets finished successfully but the tokens ain't streamed as they get generated by the model.

ajndkr commented 5 months ago

@amitjoy why are you not using the VertexStreamingCallbackHandler class you created? with 0.8.6, the fix should apply to your new class as well.

ajndkr commented 5 months ago

Also note that, if I run the above-mentioned codes in ASYNC mode (ChainRunMode.ASYNC), the chain gets finished successfully but the tokens ain't streamed as they get generated by the model.

this is expected behaviour since ChatVertexAI doesn't support async streaming.

amitjoy commented 5 months ago

@ajndkr I tried that too (the following code)

from typing import Annotated, Any

from fastapi import Depends, Query, Body
from kink import di
from lanarky.adapters.langchain.callbacks import TokenStreamingCallbackHandler, get_token_data
from lanarky.adapters.langchain.responses import StreamingResponse, ChainRunMode
from lanarky.adapters.langchain.routing import LangchainAPIRouter
from lanarky.events import Events

from agent.session.agent import SessionAgent

router = LangchainAPIRouter()

@router.post(path="/ask",
             name="Chat Endpoint",
             description="The main endpoint to ask a question to the foundation model",
             summary="Endpoint to ask question",
             tags=["chat", "ask", "chatbot"])
async def ask(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
                        question: Annotated[str, Body(title="The input question", min_length=1)],
                        session_agent: SessionAgent = Depends(lambda: di[SessionAgent])):
    chatbot = session_agent.chatbot(session_id)
    return StreamingResponse(chain=chatbot.chain,
                             run_mode=ChainRunMode.SYNC,
                             config={
                                 "inputs": {"question": question},
                                 "callbacks": [VertexStreamingCallbackHandler(output_key="text")]
                             })

class VertexStreamingCallbackHandler(TokenStreamingCallbackHandler):

    async def on_chain_end(
            self, outputs: dict[str, Any], **kwargs: dict[str, Any]) -> None:
        if self.llm_cache_used or not self.streaming:
            if self.output_key in outputs:
                message = self._construct_message(
                    data=get_token_data(outputs[self.output_key], self.mode),
                    event=Events.COMPLETION)
                await self.send(message)

it also results in unfinished chain which compels the client to keep on pinging continuously.

amitjoy commented 5 months ago

Just an interesting observation: when I send the first HTTP request (using the aforementioned code using Lanarky) to Vertex AI (Chat Bison) to answer a question, the chain finishes successfully, but, the next question's chain is never finished at all, even though, it has been answered. So, from 2nd question onwards, the end callbacks (on_llm_end, on_chain_end) are never called.

amitjoy commented 5 months ago

I have also added on_llm_error and on_retriever_error to check if any error occurred. But, these ain't invoked at all.

Another interesting observation: This happens only when multiple tokens are generated, that is, multiple COMPLETION events. If the answer is generated in a single COMPLETION event, it simply gets finished properly, but, for longer answers, they are split in multiple COMPLETION events and that's when, after generating the answer, in multiple COMPLETION events, the client keeps on pinging the SSE server continuously as it never receives any finished event. That's possibly because I don't see on_X_error or on_X_end callbacks getting executed only in this scenario at all.

amitjoy commented 5 months ago

If we run in async mode, it can generate multiple COMPLETION events too, but, one event comprises the whole complete answer. For example, if I ask the chatbot the question - What is taught in this book.

It first generates a COMPLETION event that comprises the rephrased question. And the next COMPLETION event comprises the complete answer.

For SYNC,

It first generates a COMPLETION event that comprises the rephrased question. And consequently several COMPLETION events to answer the question. And then it just never finishes.

I think there is something wrong with SYNC mode. Not sure where to point my fingers on though.

ajndkr commented 5 months ago

when I send the first HTTP request (using the aforementioned code using Lanarky) to Vertex AI (Chat Bison) to answer a question, the chain finishes successfully, but, the next question's chain is never finished at all, even though, it has been answered.

oh! this is unusual. one possible reason might be that 2 incoming requests share the same session_agent which might be causing this issue.

I would also try to reproduce these experiments with ChatOpenAI. It's possible that the issue is related to langchain.

If the issue persists, I encourage you to fork this repository and setup lanarky locally. A starting point to debug further can be here: https://github.com/ajndkr/lanarky/blob/1c2e4dc5fa47b1ed53197d43b0c8e9260e395022/lanarky/adapters/langchain/responses.py#L79-L85

This issue is now very specific to your use-case and would require more time for me to debug and fix. It's best if you investigate this further and open a pull request if you manage to find a solution.

I will keep this issue open and can assign it to you if you like.

amitjoy commented 5 months ago

@ajndkr Thanks a lot for your continuous support. Actually, the correct behaviour is as mentioned here: https://github.com/ajndkr/lanarky/issues/186#issuecomment-2017437810

amitjoy commented 5 months ago

@ajndkr You were right. I debugged it and found that, somehow, the following asyncio code never finishes

outputs = await loop.run_in_executor(
         None, partial(self.chain, **self.config)
 )

I tried with Vertex AI Gemini Pro foundation model as well and it behaves the same way :(