WSH032 / fastapi-proxy-lib

HTTP/WebSocket proxy for starlette/FastAPI
https://wsh032.github.io/fastapi-proxy-lib/
Apache License 2.0
26 stars 4 forks source link

[RFC]: add `callback` api to `WebSocketProxy` #40

Open WSH032 opened 3 months ago

WSH032 commented 3 months ago

Have you discussed it?

Discussed in https://github.com/WSH032/fastapi-proxy-lib/discussions/39

Originally posted by **IvesAwadi** July 15, 2024 Thank you for the library, it's amazing so far but for some reason I can't figure out how to log incoming data from the WebSocket and also read its responses.

Describe your feature request

Add a callback function API to ReverseWebSocketProxy so that we can use the callback function to modify the messages received and sent by the client.

Is your feature request related to a problem? Please describe

Thank you for the response, I wanted to use this to modify incoming data and also modify the responses. I basically having a client that sends WebSocket data and I want to proxy the connection to read incoming data and it's response before it's returned back to the original client. Should work similarly like the ReverseHttpProxy system but for WebSocket.

Describe the solution you'd like

Something like:

async def server_to_client_callback(
    receiver: AsyncIterator[str], sender: Callable[[str], Awaitable[None]]
):
    """Receive from the target(base_url) server and send to the client."""
    async for recv in receiver:  # Receive the message
        print(f"Received: {recv}")
        resp = f"Modified: {recv}"  # Modify the message
        await sender(resp)  # Send the message
        print(time.time)  # Do something after sending the message

@app.websocket("/{path:path}")
async def _(websocket: WebSocket):
    return await proxy.proxy(
        websocket=websocket,
        server_to_client_callback=server_to_client_callback,
    )

Describe alternatives you've considered

Or would this API be better? It would offer higher performance but be more low-level. So it wouldn't allow for background tasks after sending a message or ensuring the order of message receipt.

async def server_to_client_callback(recv: str):
    """Receive from the target(base_url) server and send to the client."""
    print(f"Received: {recv}")
    resp = f"Modified: {recv}"  # Modify the message
    return resp  # Send the message

Additional context

For the first API, it is necessary to consider exceptions and disconnections:

if an exception occurs in the proxy, what do you want the callback to receive; or conversely, if an exception occurs in the callback, what should the proxy send to the client?

WSH032 commented 3 months ago

For the first API, it is necessary to consider exceptions and disconnections:

if an exception occurs in the proxy, what do you want the callback to receive; or conversely, if an exception occurs in the callback, what should the proxy send to the client?

We use a closable pipeline to implement this functionality. The diagram is as follows:

flowchart LR;
    Client -->|A| CallbackReceive;
    CallbackReceive -->|B| CallbackSend;
    CallbackSend -->|C| TargetServer;

A and C are Anyio's memory-object-streams (closable pipeline).

For Exceptions

If any of the pipelines(A, B, or C) encounter an exception, the proxy will automatically close the WebSocket connections with the client and the target server. The close code is hardcoded to 1011, but this might change in the future, so you should not rely on it.

if an exception occurs in the proxy, what do you want the callback to receive?

The proxy will close the sending end of A and the receiving end of C, causing the callback to receive an exception(also).

However, note that for the current implementation, the exceptions received by callback are raised by Anyio, but this might change in the future, so you should not rely on it.

if an exception occurs in the callback, what should the proxy send to the client?

In any case, the callback needs to ensure that it closes the receiving end of A and the sending end of C so that the proxy knows the callback has become invalid. In this situation, the proxy will automatically close the WebSocket connections with 1011.

the callback needs to ensure that it closes the receiving end of A and the sending end of C

To make this more user-friendly, we will provide a context manager. The API will be as follows:

from contextlib import AbstractContextManager
from typing import Tuple

from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

async def server_to_client_callback(
    pipe_builder: AbstractContextManager[
        Tuple[MemoryObjectReceiveStream[str], MemoryObjectSendStream[str]]
    ],
):
    # The ContextManager will automatically close both `receiver` and `sender`
    with pipe_builder as (receiver, sender):
        async for recv in receiver:
            print(f"Received: {recv}")
            resp = f"Modified: {recv}"
            await sender.send(resp)
            print(time.time)

For Disconnections

If the proxy receives a disconnection request

The proxy will close the pipeline, and in this case, the callback will receive an exception.

[!CAUTION] This is the same as when an exception occurs in the proxy, which means the callback cannot distinguish between a normal disconnection and an exception occurring in the proxy.

If the callback wants to disconnect

This is not possible for now. The callback cannot disconnect normally; it can only close the pipeline. In this case, the proxy will treat it as an exception.

[!TIP] If anyone has such a requirement, please consider submitting a feature request.

WSH032 commented 3 months ago

CC @IvesAwadi

What do you think?

IvesAwadi commented 3 months ago

CC @IvesAwadi

What do you think?

Looks good to me.