langflow-ai / langflow

Langflow is a low-code app builder for RAG and multi-agent AI applications. It’s Python-based and agnostic to any model, API, or database.
http://www.langflow.org
MIT License
28.55k stars 3.67k forks source link

Streaming URL is not unique in multi-user environment #3468

Open mieslep opened 3 weeks ago

mieslep commented 3 weeks ago

Bug Description

When stream=true is passed, the resulting stream_url is always the same, no matter what session_id is passed. Possibly related to #2563 , that stream is completely open so any session can consume tokens from the stream.

Reproduction

  1. Invoke any flow via API with stream=true
  2. Inspect the response init_response["outputs"][0]["outputs"][0]["artifacts"]["stream_url"] : no matter what browser / session ID is passed, this value is always the same.

Expected behavior

1, A unique stream should be created for each API call.

  1. Ideally some sort of "authentication" of the caller (even if just expecting the same session_id on the request) is present

Who can help?

No response

Operating System

Windows 11

Langflow Version

1.0.14

Python Version

3.12

Screenshot

No response

Flow File

No response

dosubot[bot] commented 3 weeks ago

The stream_url is not unique in a multi-user environment when stream=true is passed because the initiateSession method constructs the endpoint URL using the flowId and the stream parameter, but it does not include any user-specific information or unique session identifiers. This can lead to the same stream_url being generated for different users or sessions.

To ensure a unique stream and caller authentication for each API call, you can modify the initiateSession method to include a unique session identifier (e.g., a UUID) and user-specific information in the endpoint URL or request body. Additionally, you should ensure that the server-side implementation can handle and differentiate these unique identifiers.

Here is an example of how you can modify the initiateSession method to include a unique session identifier:

import { v4 as uuidv4 } from 'uuid';

async initiateSession(flowId, inputValue, stream = false, tweaks = {}) {
    const sessionId = uuidv4(); // Generate a unique session ID
    const endpoint = `/api/v1/run/${flowId}?stream=${stream}&session_id=${sessionId}`;
    return this.post(endpoint, { input_value: inputValue, tweaks: tweaks });
}

In this example, a unique session ID is generated using the uuid library and included as a query parameter in the endpoint URL. This ensures that each API call has a unique stream_url.

Additionally, you should ensure that the server-side implementation can handle the session_id parameter and generate a unique stream_url for each session [1].

To continue talking to Dosu, mention @dosu.

mieslep commented 3 weeks ago

@dosu passing in different values for session_id does not change the stream_url in any way, nor is it required to pass in session_id on the GET to consume stream tokens.

dosubot[bot] commented 3 weeks ago

The issue arises because the backend does not currently use the session_id to generate a unique stream_url. To address this, you need to ensure that the backend generates a unique stream_url for each session.

Here's how you can modify the backend to handle unique session identifiers and generate unique stream_urls:

  1. Modify the build_vertex_stream function to use session_id: Ensure that the session_id is used to generate a unique stream URL.
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
    flow_id: str,
    vertex_id: str,
    session_id: Optional[str] = None,
    chat_service: "ChatService" = Depends(get_chat_service),
    session_service: "SessionService" = Depends(get_session_service),
):
    try:
        async def stream_vertex():
            try:
                if not session_id:
                    raise ValueError("Session ID is required for streaming.")

                session_data = await session_service.load_session(session_id, flow_id=flow_id)
                graph, artifacts = session_data if session_data else (None, None)
                if not graph:
                    raise ValueError(f"No graph found for {flow_id}.")

                vertex: "ChatVertex" = graph.get_vertex(vertex_id)
                if not hasattr(vertex, "stream"):
                    raise ValueError(f"Vertex {vertex_id} does not support streaming")

                if isinstance(vertex._built_result, str) and vertex._built_result:
                    stream_data = StreamData(
                        event="message",
                        data={"message": f"Streaming vertex {vertex_id}"},
                    )
                    yield str(stream_data)
                    stream_data = StreamData(
                        event="message",
                        data={"chunk": vertex._built_result},
                    )
                    yield str(stream_data)
                elif not vertex.frozen or not vertex._built:
                    logger.debug(f"Streaming vertex {vertex_id}")
                    stream_data = StreamData(
                        event="message",
                        data={"message": f"Streaming vertex {vertex_id}"},
                    )
                    yield str(stream_data)
                    async for chunk in vertex.stream():
                        stream_data = StreamData(
                            event="message",
                            data={"chunk": chunk},
                        )
                        yield str(stream_data)
                elif vertex.result is not None:
                    stream_data = StreamData(
                        event="message",
                        data={"chunk": vertex._built_result},
                    )
                    yield str(stream_data)
                else:
                    raise ValueError(f"No result found for vertex {vertex_id}")

            except Exception as exc:
                logger.exception(f"Error building vertex: {exc}")
                exc_message = parse_exception(exc)
                if exc_message == "The message must be an iterator or an async iterator.":
                    exc_message = "This stream has already been closed."
                yield str(StreamData(event="error", data={"error": exc_message}))
            finally:
                logger.debug("Closing stream")
                yield str(StreamData(event="close", data={"message": "Stream closed"}))

        return StreamingResponse(stream_vertex(), media_type="text/event-stream")
    except Exception as exc:
        raise HTTPException(status_code=500, detail="Error building vertex") from exc
  1. Ensure the frontend includes session_id in the request: Modify the initiateSession method to include a unique session_id.
import { v4 as uuidv4 } from 'uuid';

async initiateSession(flowId, inputValue, stream = false, tweaks = {}) {
    const sessionId = uuidv4(); // Generate a unique session ID
    const endpoint = `/api/v1/run/${flowId}?stream=${stream}&session_id=${sessionId}`;
    return this.post(endpoint, { input_value: inputValue, tweaks: tweaks });
}
  1. Handle the session_id in the backend: Ensure that the backend uses the session_id to generate a unique stream_url.

By making these changes, each session will have a unique stream_url, ensuring that streams are not shared across different sessions [1][2].

To continue talking to Dosu, mention @dosu.

therockerline commented 2 weeks ago

Hello, is there any update on this issue? It would be nice if by calling the flow with the stream=true parameter you could send a response directly as a stream, avoiding using the stream_url afterwards. This way you could directly use the session logic that has already been implemented on the flow

guojingfeng1994 commented 1 week ago

我也有同样的问题。在web应用程序中,让每个请求拥有自己独立的处理线程和作用域来提高并发太常见了。我在使用stream流式输出时,返回的stream url都是固定的,这导致我同时发起两个回话请求时,stream url输出的总是最后一个会话请求的数据,而不是分别返回各自会话请求的回答。我不知道该如何解决这个问题,如果要使用串行化,那样对web应用来说处理效率实在是太低了。如果有人知道如何解决这个问题,麻烦方便告知下,谢谢