langflow-ai / langflow

Langflow is a low-code app builder for RAG and multi-agent AI applications. It’s Python-based and agnostic to any model, API, or database.
http://www.langflow.org
MIT License
30.61k stars 3.84k forks source link

API Stream leak #2563

Open rafael-junio opened 3 months ago

rafael-junio commented 3 months ago

Describe the bug When using the API to stream a chat output, the stream url endpoint is accessible open. Resulting in possible leaking of the message if two devices call the same endpoint.

To Reproduce Steps to reproduce the behavior:

  1. Use the basic flow to make a call using the playground
  2. Go to any browser or device and access: http:///api/v1/build///stream. ex: (https://langflow.brasil.com/api/v1/build/90deb8be-4e8c-422a-b35f-45042a2c49c9/ChatOutput-EzD73/stream)
  3. Both devices has access to the same output, no authentication or randomness on the url provided.

Additional context The problem is that with the current langflow implementation, is impossible to have a multi-user application using the API and streaming.

dosubot[bot] commented 3 months ago

Hello @rafael-junio! I'm here to help you with any bugs, questions, or contributions. Let's tackle this issue together!

To secure the API stream endpoint and prevent unauthorized access and message leaks when two devices call the same endpoint, you can implement the following measures:

  1. Authentication: Ensure that the API requires authentication by using an API key or other authentication mechanisms. Langflow supports API key functionality, which can be included in the request header or query parameter to authenticate API calls.

  2. Session Management: Use unique session tokens for each device to ensure that each device has its own session. This can be done by generating a unique session ID when initiating a session.

  3. Secure Communication: Ensure that all communication with the API is done over HTTPS to prevent eavesdropping.

  4. Token Renewal: Implement token renewal to handle expired tokens and maintain secure sessions.

Here is an updated version of the provided code with these security measures:

import axios, { AxiosError, AxiosInstance } from "axios";
import { Cookies } from "react-cookie";

class LangflowClient {
    constructor(baseURL, apiKey) {
        this.baseURL = baseURL;
        this.apiKey = apiKey;
        this.api = axios.create({ baseURL: this.baseURL });
        this.cookies = new Cookies();

        // Add request interceptor to include the access token
        this.api.interceptors.request.use(
            (config) => {
                const accessToken = this.cookies.get("access_token_lf");
                if (accessToken) {
                    config.headers["Authorization"] = `Bearer ${accessToken}`;
                }
                if (this.apiKey) {
                    config.headers["x-api-key"] = this.apiKey;
                }
                return config;
            },
            (error) => Promise.reject(error)
        );

        // Add response interceptor to handle token renewal
        this.api.interceptors.response.use(
            (response) => response,
            async (error) => {
                if (error.response?.status === 401 || error.response?.status === 403) {
                    const newToken = await this.renewAccessToken();
                    if (newToken) {
                        error.config.headers["Authorization"] = `Bearer ${newToken}`;
                        return axios.request(error.config);
                    }
                }
                return Promise.reject(error);
            }
        );
    }

    async renewAccessToken() {
        try {
            const response = await axios.post(`${this.baseURL}/renew-token`, {
                refreshToken: this.cookies.get("refresh_token_lf"),
            });
            const { accessToken } = response.data;
            this.cookies.set("access_token_lf", accessToken);
            return accessToken;
        } catch (error) {
            console.error("Token renewal failed:", error);
            return null;
        }
    }

    async post(endpoint, body, headers = {"Content-Type": "application/json"}) {
        const url = `${this.baseURL}${endpoint}`;
        try {
            const response = await this.api.post(url, body, { headers });
            return response.data;
        } catch (error) {
            console.error('Request Error:', error);
            throw error;
        }
    }

    async initiateSession(flowId, inputValue, stream = false, tweaks = {}) {
        const sessionId = this.generateSessionId();
        const endpoint = `/api/v1/run/${flowId}?stream=${stream}&session_id=${sessionId}`;
        return this.post(endpoint, { input_value: inputValue, tweaks: tweaks });
    }

    generateSessionId() {
        return 'xxxx-xxxx-xxxx-xxxx'.replace(/[x]/g, () => {
            return (Math.random() * 16 | 0).toString(16);
        });
    }

    handleStream(streamUrl, onUpdate, onClose, onError) {
        const eventSource = new EventSource(streamUrl);

        eventSource.onmessage = event => {
            const data = JSON.parse(event.data);
            onUpdate(data);
        };

        eventSource.onerror = event => {
            console.error('Stream Error:', event);
            onError(event);
            eventSource.close();
        };

        eventSource.addEventListener("close", () => {
            onClose('Stream closed');
            eventSource.close();
        });

        return eventSource;
    }

    async runFlow(flowIdOrName, inputValue, tweaks, stream = false, onUpdate, onClose, onError) {
        try {
            const initResponse = await this.initiateSession(flowIdOrName, inputValue, stream, tweaks);
            console.log('Init Response:', initResponse);
            if (stream && initResponse && initResponse.outputs && initResponse.outputs[0].outputs[0].artifacts.stream_url) {
                const streamUrl = initResponse.outputs[0].outputs[0].artifacts.stream_url;
                console.log(`Streaming from: ${streamUrl}`);
                this.handleStream(streamUrl, onUpdate, onClose, onError);
            }
            return initResponse;
        } catch (error) {
            console.error('Error running flow:', error);
            onError('Error initiating session');
        }
    }
}

async function main() {
    const flowIdOrName = '${endpointName || flowId}';
    const inputValue = 'User message';
    const stream = false;
    const langflowClient = new LangflowClient('${window.location.protocol}//${window.location.host}',
        ${isAuth ? "'your-api-key'" : "null"});
    const tweaks = ${tweaksString};
    response = await langflowClient.runFlow(
        flowIdOrName,
        inputValue,
        tweaks,
        stream,
        (data) => console.log("Received:", data.chunk), // onUpdate
        (message) => console.log("Stream Closed:", message), // onClose
        (error) => console.log("Stream Error:", error) // onError
    );
    if (!stream) {
        const flowOutputs = response.outputs[0];
        const firstComponentOutputs = flowOutputs.outputs[0];
        const output = firstComponentOutputs.outputs.message;

        console.log("Final Output:", output.message.text);
    }
}

main();

In this updated code:

Additionally, the ApiInterceptor from the langflow project ensures that access tokens are added to requests and handles token renewal and error management [1][2][3].

To continue talking to Dosu, mention @dosu.

anovazzi1 commented 1 week ago

this issue will be fixed by the new streaming that is coming soon