cheshire-cat-ai / core

Production ready AI agent framework
https://cheshirecat.ai
GNU General Public License v3.0
2.35k stars 320 forks source link

WebSocket inconsistent closing when a chat is in progress #915

Closed fatualux closed 1 month ago

fatualux commented 1 month ago

Description of the bug

SUMMARY

    RuntimeError: Unexpected ASGI message 'websocket.close', after sending 'websocket close'.

When a user closes his StrayCat while a process is still running, a runtime error is raised, and the user can no longer interact with the Cat until the Cat is restarted.

IN DEPTH

When we connect to the Cat with a given client, a new connection (StrayCat) to a WebSocket's server is created.

Let's say this user closes the connection while a process is still active (e.g. the Cat's is still answering to a query).

Inside the same session of the CheshireCat, if we connect with that user again, the Cat uses the same session; but now, that session has an object websocket in an inconsistent state.

This probably happens because the Cat's WebSocket object, a higher level implementation of the native Python WebSocket object (see Starlette framework), has a status Connected, but actually the connection is closed (and the lower level Python WebSocket object has a status Disconnected).

Steps to Reproduce

My collegue, Luca, implemented a script in order to reproduce this unexpected behaviour, systematically, getting an error message in the Cat's console.

Requirements:

In order to make this happen, we can follow these steps:

  1. Open the Cat's core directory and start the Cat:
    docker compose up
  2. Move reproduce.py in a separate folder, create a Python virtual environment and install the required dependency:
    mkdir reproducer && mv reproduce.py reproducer/
    cd reproducer && virtualenv .venv
    source .venv/bin/activate && pip install cheshire-cat-api
  1. Launch the script, giving, at first:
    python reproduce.py 5 Alice

Note: 5 is the sleep time in seconds, "Alice" is the username passed to the function.

The second time we run the script, we change the sleep time to be less than one second:

    python reproduce.py 0.5 Alice

This time, we get this error:

    RuntimeError: Unexpected ASGI message 'websocket.close', after sending 'websocket.close'.

From now on, we will no longer be able to connect to the WebSocket with the user Alice. If we repeat the process with the user Queen, we get the same result.

alice

Here's the complete traceback (from the Cat's console):

traceback.md

How the reproducer works?

Through cat_client.connect_ws(), we connect to the Cat's WebSocket server, then:

  1. The function hello() sends a message ("pippo") upon establishing a connection.
  2. print_msg(msg) prints any incoming messages.
  3. We wait a certain amount of time ('time_of_sleeping', default is 3 sec), then cat_client.close() closes the connection to the WebSocket.

Why do we get an error if we set a time_of_sleeping < 1 second?

Because the Cat is still formulating an answer, and we are closing the connection while this process is running.

Expected behavior

I expect we can systematically replicate the above described behaviour, everytime we run the reproducer script, with any user we choose, if a sleep time of less than one second is given as an argument.

Additional context

My collegue implemented a fix to solve the problem, I did a PR.

FIX 1

The first step is to edit stray_cat.py:

Basically, we add a try-except block to the original code (__send_ws_json) to handle the problem:

from websockets.exceptions import ConnectionClosedOK

    def __send_ws_json(self, data: Any):
        try:
            # Run the corutine in the main event loop in the main thread
            # and wait for the result
            asyncio.run_coroutine_threadsafe(
                self.__ws.send_json(data), loop=self.__main_loop
            ).result()

        except ConnectionClosedOK as ex:
            if ex.code == 1000:
                log.warning(ex)
                if self.__ws:
                    del self.__ws
                    self.__ws = None

It catches the ConnectionClosedOK exception, and, If the connection is closed with the code 1000 (normal closure), it logs a warning using log.warning(ex).

Then, if the ConnectionClosedOK exception is caught, it deletes the self.__ws variable and sets it to None.

In the screencast below we can observe how the fix works against reproduce.py:

fix1

FIX 2

The first fix is used to handle this specific case: a given user closes the connection while the Cat is still providing an answer.

In case of a hypothetical different flow, we would run into the same problem again.

To avoid this, we can generalize the fix, adding this try-except block to the async method get_user_stray in connection.py:

    if stray._StrayCat__ws:
    try: 
        await stray._StrayCat__ws.close()
    except RuntimeError as ex:
        log.warning(ex)
        if stray._StrayCat__ws:
            del stray._StrayCat__ws
            stray._StrayCat__ws = None

This fix ensure that, if we get a RuntimeError from WebSocket, we get a warning, and the stray, that once again is an inconsistent state, is cleaned up (same strategy we used for the first fix).

To avoid using a private attribute, we could extract this and implement a new method in the StrayCat class:

    async def close_connection(self):
        if self.__ws:
            try:
                await self.__ws.close()
            except RuntimeError as ex:
                log.warning(ex)
                if self.__ws:
                    del self.__ws
                    self.__ws = None
        if user.id in strays.keys():
            stray = strays[user.id]
            await stray.close_connection()

Now, we have to run the reproducer:

fix2

pieroit commented 1 month ago

Awesome thanks for the care in details. Will check ASAP

fatualux commented 1 month ago

Thanks to you, for your project, support and for the time you will take for reviewing the PR

Il Mer 18 Set 2024, 20:42 Piero Savastano @.***> ha scritto:

Awesome thanks for the care in details. Will check ASAP

— Reply to this email directly, view it on GitHub https://github.com/cheshire-cat-ai/core/issues/915#issuecomment-2359168310, or unsubscribe https://github.com/notifications/unsubscribe-auth/AIPQJXF2KHE4AFB5S3WSH6TZXHCSVAVCNFSM6AAAAABOK3VQGGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDGNJZGE3DQMZRGA . You are receiving this because you authored the thread.Message ID: @.***>

sambarza commented 1 month ago

I can reproduce systematically with admin portal too:

  1. ask something from the admin chat with the admin user
  2. close the tab during cat response
  3. open the admin portal again
  4. you get Something went wrong while connecting to the server in the portal and RuntimeError: Cannot call "send" once a close message has been sent. in the console

Workaround is to restart the Cat

sambarza commented 1 month ago

Normal process: when the user closes the tab, we get the WebSocketDisconnect exception and we reset the ws connection:

@router.websocket("/ws")
@router.websocket("/ws/{user_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    stray=Depends(WebSocketAuth(AuthResource.CONVERSATION, AuthPermission.WRITE)),
):
    """
    Endpoint to handle incoming WebSocket connections by user id, process messages, and check for messages.
    """

    # Add the new WebSocket connection to the manager.
    await websocket.accept()
    try:
        # Process messages
        await receive_message(websocket, stray)
    except WebSocketDisconnect:             <----------------------
        # Handle the event where the user disconnects their WebSocket.
        stray._StrayCat__ws = None 
        log.info("WebSocket connection closed")
    # finally:
    #     del strays[user_id]

Issue: when the user closes the tab while the Cat answering, the method __send_ws_json raises a "connection closed" exception. While we handle this exception, we still attempt to call send_error():

    def run(self, user_message_json):
        try:
            cat_message = self.loop.run_until_complete(self.__call__(user_message_json))
            # send message back to client
            self.send_chat_message(cat_message)
        except Exception as e:
            # Log any unexpected errors
            log.error(e)
            traceback.print_exc()
            # Send error as websocket message
            self.send_error(e)   <--------------

send_error() tries to send the error through the websocket and it raises a new exception that is not handled. This leads to an exception at ASGI level, which causes the loss of the normal WebSocketDisconnect exception, and the connection is not properly reset.

Exception:

cheshire_cat_core  | ERROR:    Exception in ASGI application
cheshire_cat_core  | Traceback (most recent call last):
cheshire_cat_core  |   File "/usr/local/lib/python3.10/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 238, in run_asgi
cheshire_cat_core  |     result = await self.app(self.scope, self.asgi_receive, self.asgi_send)

I see the PR #914, the FIX 1 it's a good starting point, thank you @fatualux! The exception handling in the send_ws_json method allows us to catch the WebSocketDisconnect exception, and the connection is properly reset by the normal process. Therefore, resetting the connection in `send_ws_jsonis not necessary. However handling the exception in__send_ws_jsonhide the handling of the exception already done inrun` method.

I would prefer to keep the exception handling in the run method, so I propose adding only a try block around the send_error() call.

P.S. __send_ws_json is called also to send tokens, the exception is already handled somehow by langchain:

image

Log message written by langchain:

cheshire_cat_core  | Error in NewTokenHandler.on_llm_new_token callback: ConnectionClosedOK(Close(code=1001, reason=''), Close(code=1001, reason=''), True)
fatualux commented 1 month ago

OK, thanks for the suggestion. I will redo the PR branching off "develop" as @pieroit asked, and put the try-except block in the run method instead of handling the exception in __send_ws_json, as you suggest.