langflow-ai / langflow

Langflow is a low-code app builder for RAG and multi-agent AI applications. It’s Python-based and agnostic to any model, API, or database.
http://www.langflow.org
MIT License
26.01k stars 3.49k forks source link

Best way to manage concurrency with ephemeral instances? #2705

Open h-arnold opened 1 month ago

h-arnold commented 1 month ago

Background

I'm running langflow using Google Cloud Run. This works well for my use case which involves very infrequent but bursty (100,000s of tokens per second) requests. It's connected to a postgres database hosted elsewhere.

I've created a solution which works, but it's hacky and inelegant and I'm hoping to get some suggestions on some better ideas.

The issue

My issue is that the docker image as it stands doesn't allow write access to the /app/data/.cache folder by default, which langflow requires to run. The gcs-fuse adaptor isn't POSIX compliant and doesn't provide concurrency control. This means that when more than one worker or more than one instances is running, I get lots of these sorts of errors:

'IO Error: Could not set lock on file "/app/data/.cache/"

My solution

I've created a separate Dockerfile and script which creates a ramdisk mounted at /app/data. When a container instance is started, the ramdisk is created and all writes go to RAM.

From what I can see, this solution has the following limitations:

Any ideas on how I could approach this in a better way?

dosubot[bot] commented 1 month ago

Hello, @h-arnold! I'm here to help you with any bugs, questions, or contributions you might have. Let's work together to solve this issue.

To manage concurrency with ephemeral instances of Langflow on Google Cloud Run and address the issues with the /app/data/.cache folder, you can leverage Kubernetes and Helm charts for a more robust solution. Here are the steps you can follow:

  1. Use Kubernetes for Deployment:

    • Deploy Langflow using Kubernetes, which allows you to manage multiple instances and handle concurrency more effectively.
    • Follow the steps in the Kubernetes deployment guide to set up a Kubernetes cluster and install Langflow using Helm charts.
  2. Persistent Storage:

    • Instead of using a ramdisk for the /app/data/.cache folder, configure persistent storage.
    • Use an external PostgreSQL database for data persistence. Update the values.yaml file to configure the external database.
    postgresql:
     enabled: true
     fullnameOverride: "langflow-ide-postgresql-service"
     auth:
       username: "langflow"
       password: "langflow-postgres"
       database: "langflow-db"
    
    langflow:
     backend:
       externalDatabase:
         enabled: true
         driver:
           value: "postgresql"
         host:
           value: "langflow-ide-postgresql-service"
         port:
           value: "5432"
         database:
           value: "langflow-db"
         user:
           value: "langflow"
         password:
           valueFrom:
             secretKeyRef:
               key: "password"
               name: "langflow-ide-postgresql-service"
       sqlite:
         enabled: false
  3. Scaling:

    • Scale the number of replicas for the Langflow backend and frontend services to handle increased load and improve concurrency.
    • Update the replicaCount value in the values.yaml file.
    langflow:
     backend:
       replicaCount: 3
     frontend:
       replicaCount: 3
  4. Resource Management:

    • Configure resource requests and limits to ensure that each instance has sufficient resources and to prevent resource contention.
    • Update the resources values in the values.yaml file.
    langflow:
     backend:
       resources:
         requests:
           memory: "2Gi"
           cpu: "1000m"
     frontend:
       resources:
         requests:
           memory: "1Gi"
           cpu: "1000m"

By following these steps, you can improve concurrency management and data persistence for ephemeral instances of Langflow running on Google Cloud Run [1].

To continue talking to Dosu, mention @dosu.

h-arnold commented 1 month ago

@dosu The monitoring service appears to use it's own duckdb instance rather than postgresql and that looks hard coded to me. Is that the only thing that requires write access from .cache?

dosubot[bot] commented 1 month ago

The monitoring service is the primary component that requires write access to the /app/data/.cache folder in Langflow. This is evident from the MonitorService class, which handles database operations such as adding rows, deleting messages, and updating messages, all of which involve writing to the monitor.duckdb file located in the cache directory.

However, other components also require write access to the cache directory. For instance, the utility functions in src/backend/langflow/services/cache/utils.py also perform write operations. These functions include save_binary_file, save_uploaded_file, and clear_old_cache_files, which handle saving files and managing cache files in the cache directory.

To address this in a more efficient way, you can ensure that only the necessary components have the required permissions to write to this directory. This can be done by setting appropriate file system permissions or by configuring the services to run with the necessary privileges [1][2][3][4][5].

To continue talking to Dosu, mention @dosu.

h-arnold commented 1 month ago

@dosu Based on the code you have access to, can you see a way of either modifying the code to move all of this to a database? For example changing the database the MonitorService class can use, or by saving the binary files to a document database?

dosubot[bot] commented 1 month ago

To modify the MonitorService class to move all write operations from the /app/data/.cache folder to a database, you can replace the DuckDB operations with operations for a document database like MongoDB. Below is an example of how you can modify the MonitorService class to use MongoDB instead of DuckDB:

  1. Install the pymongo package if you haven't already:

    pip install pymongo
  2. Modify the MonitorService class to use MongoDB:

from datetime import datetime
from typing import TYPE_CHECKING, Optional, Union

from loguru import logger
from pymongo import MongoClient
from pymongo.collection import Collection

from langflow.services.base import Service
from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel

if TYPE_CHECKING:
    from langflow.services.settings.manager import SettingsService

class MonitorService(Service):
    name = "monitor_service"

    def __init__(self, settings_service: "SettingsService"):
        self.settings_service = settings_service
        self.client = MongoClient("mongodb://localhost:27017/")
        self.db = self.client["monitor_service"]
        self.table_map = {
            "transactions": self.db["transactions"],
            "messages": self.db["messages"],
            "vertex_builds": self.db["vertex_builds"],
        }

        try:
            self.ensure_tables_exist()
        except Exception as e:
            logger.exception(f"Error initializing monitor service: {e}")

    def ensure_tables_exist(self):
        # MongoDB creates collections automatically when you first insert a document
        pass

    def add_row(
        self,
        table_name: str,
        data: Union[dict, TransactionModel, MessageModel, VertexBuildModel],
    ):
        collection: Collection = self.table_map.get(table_name)
        if collection is None:
            raise ValueError(f"Unknown table name: {table_name}")

        if isinstance(data, dict):
            collection.insert_one(data)
        else:
            collection.insert_one(data.dict())

    def load_table_as_dataframe(self, table_name):
        collection: Collection = self.table_map.get(table_name)
        if collection is None:
            raise ValueError(f"Unknown table name: {table_name}")

        import pandas as pd
        return pd.DataFrame(list(collection.find()))

    @staticmethod
    def get_timestamp():
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def get_vertex_builds(
        self,
        flow_id: Optional[str] = None,
        vertex_id: Optional[str] = None,
        valid: Optional[bool] = None,
        order_by: Optional[str] = "timestamp",
    ):
        query = {}
        if flow_id:
            query["flow_id"] = flow_id
        if vertex_id:
            query["_id"] = vertex_id
        if valid is not None:
            query["valid"] = valid

        collection: Collection = self.table_map["vertex_builds"]
        cursor = collection.find(query).sort(order_by)
        return list(cursor)

    def delete_vertex_builds(self, flow_id: Optional[str] = None):
        query = {}
        if flow_id:
            query["flow_id"] = flow_id

        collection: Collection = self.table_map["vertex_builds"]
        collection.delete_many(query)

    def delete_messages(self, session_id: str):
        query = {"session_id": session_id}

        collection: Collection = self.table_map["messages"]
        collection.delete_many(query)

    def add_message(self, message: MessageModel):
        self.add_row("messages", message)

    def get_messages(
        self,
        sender: Optional[str] = None,
        sender_name: Optional[str] = None,
        session_id: Optional[str] = None,
        order_by: Optional[str] = "timestamp",
        order: Optional[str] = "DESC",
        limit: Optional[int] = None,
    ):
        query = {}
        if sender:
            query["sender"] = sender
        if sender_name:
            query["sender_name"] = sender_name
        if session_id:
            query["session_id"] = session_id

        collection: Collection = self.table_map["messages"]
        cursor = collection.find(query).sort(order_by, -1 if order == "DESC" else 1)
        if limit is not None:
            cursor = cursor.limit(limit)
        return list(cursor)

    def get_transactions(
        self,
        source: Optional[str] = None,
        target: Optional[str] = None,
        status: Optional[str] = None,
        order_by: Optional[str] = "timestamp",
    ):
        query = {}
        if source:
            query["source"] = source
        if target:
            query["target"] = target
        if status:
            query["status"] = status

        collection: Collection = self.table_map["transactions"]
        cursor = collection.find(query).sort(order_by)
        return list(cursor)

This code replaces the DuckDB operations with MongoDB operations, ensuring that all write operations are moved to a MongoDB database [1][2].

To continue talking to Dosu, mention @dosu.

carlosrcoelho commented 6 days ago

@h-arnold

Do you need any assistance with this case? If not, please let us know if this issue can be closed.

codenprogressive commented 6 days ago

I had a similar chalenge. So I'm deploying Langflow in a google Cloud Run Service. For the langflow app cache directory, I mounted volume (gcs bucket) to /root/.cache/langflow. I'm using single worker for now! Should I expect errors when increasing the number of workers?