langflow-ai / langflow

⛓️ Langflow is a visual framework for building multi-agent and RAG applications. It's open-source, Python-powered, fully customizable, model and vector store agnostic.
http://www.langflow.org
MIT License
19.79k stars 2.94k forks source link

Lock type inconsistency between set_cache and upsert methods #1936

Open liuzh1516 opened 1 month ago

liuzh1516 commented 1 month ago

Describe the bug There is an inconsistency in the lock types used between the set_cache and upsert methods. This can potentially lead to runtime errors due to the mismatch of asyncio.Lock and threading.Lock. The asyncio.Lock implements the aenter method, which requires usage with async with lock, whereas threading.Lock implements the enter method, which requires usage with with lock. In the upsert method, the line with lock or self._lock: only works correctly if lock is an instance of threading.Lock. When lock is an instance of asyncio.Lock, it will fail because asyncio.Lock does not implement the enter method, resulting in an error as enter cannot be found.

Browser and Version

To Reproduce Steps to reproduce the behavior: 1、Drag a Chat Input onto the canvas. 2、Click "Build" at the top right corner to validate status. 3、Observe the error message that appears at the bottom left corner.

Screenshots image

image

dosubot[bot] commented 1 month ago

To resolve the lock type inconsistency between the set_cache and upsert methods, ensure that both methods use the same type of lock. Here’s how you can achieve consistency:

For AsyncInMemoryCache:

class AsyncInMemoryCache(AsyncBaseCacheService, Service):
    def __init__(self, max_size=None, expiration_time=3600):
        self.cache = OrderedDict()
        self.lock = asyncio.Lock()
        self.max_size = max_size
        self.expiration_time = expiration_time

    async def get(self, key, lock: Optional[asyncio.Lock] = None):
        if not lock:
            async with self.lock:
                return await self._get(key)
        else:
            return await self._get(key)

    async def set(self, key, value, lock: Optional[asyncio.Lock] = None):
        if not lock:
            async with self.lock:
                await self._set(key, value)
        else:
            await self._set(key, value)

    async def upsert(self, key, value, lock: Optional[asyncio.Lock] = None):
        if not lock:
            async with self.lock:
                await self._upsert(key, value)
        else:
            await self._upsert(key, value)

For ThreadingInMemoryCache:

class ThreadingInMemoryCache(CacheService, Service):
    def __init__(self, max_size=None, expiration_time=60 * 60):
        self._cache = OrderedDict()
        self._lock = threading.RLock()
        self.max_size = max_size
        self.expiration_time = expiration_time

    def get(self, key, lock: Optional[threading.Lock] = None):
        with lock or self._lock:
            return self._get_without_lock(key)

    def set(self, key, value, lock: Optional[threading.Lock] = None):
        with lock or self._lock:
            if key in self._cache:
                self.delete(key)
            elif self.max_size and len(self._cache) >= self.max_size:
                self._cache.popitem(last=False)
            self._cache[key] = {"value": value, "time": time.time()}

    def upsert(self, key, value, lock: Optional[threading.Lock] = None):
        with lock or self._lock:
            existing_value = self._get_without_lock(key)
            if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
                existing_value.update(value)
                value = existing_value
            self.set(key, value)

Additionally, in the ChatService class, ensure that the set_cache, get_cache, and clear_cache methods consistently use asyncio.Lock:

import asyncio
from collections import defaultdict
from typing import Any, Optional

from langflow.services.base import Service
from langflow.services.deps import get_cache_service

class ChatService(Service):
    name = "chat_service"

    def __init__(self):
        self._cache_locks = defaultdict(asyncio.Lock)
        self.cache_service = get_cache_service()

    async def set_cache(self, flow_id: str, data: Any, lock: Optional[asyncio.Lock] = None) -> bool:
        result_dict = {
            "result": data,
            "type": type(data),
        }
        await self.cache_service.upsert(flow_id, result_dict, lock=lock or self._cache_locks[flow_id])
        return flow_id in self.cache_service

    async def get_cache(self, flow_id: str, lock: Optional[asyncio.Lock] = None) -> Any:
        return await self.cache_service.get(flow_id, lock=lock or self._cache_locks[flow_id])

    async def clear_cache(self, flow_id: str, lock: Optional[asyncio.Lock] = None):
        await self.cache_service.delete(flow_id, lock=lock or self._cache_locks[flow_id])

By ensuring that each method within a class uses the same type of lock, you can avoid runtime errors due to lock type mismatches.

#### Details **Sources** - [src/backend/base/langflow/services/cache/service.py](https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/services/cache/service.py) - [src/backend/base/langflow/services/chat/service.py](https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/services/chat/service.py) - [src/backend/base/langflow/services/cache/base.py](https://github.com/logspace-ai/langflow/blob/dev/src/backend/base/langflow/services/cache/base.py) - [src/backend/langflow/services/cache/service.py](https://github.com/logspace-ai/langflow/blob/dev/src/backend/langflow/services/cache/service.py) - [src/frontend/src/components/crashErrorComponent/index.tsx](https://github.com/langflow-ai/langflow/blob/dev/src/frontend/src/components/crashErrorComponent/index.tsx) - [src/backend/langflow/utils/logger.py](https://github.com/logspace-ai/langflow/blob/dev/src/backend/langflow/utils/logger.py)
**About Dosu** This response is meant to be useful and save you time. It is **not meant to be a precise solution**, but rather a starting point for your own research. Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

tjroamer commented 1 month ago

I encountered the same problem. The error was gone after I changed the value of LANGFLOW_CACHE_TYPE to async in the .env file. This is not documented, however. In the .env.example, only memory and redis are mentioned.

@dosu

Alperemrehas commented 1 month ago

I encountered the same problem. The error was gone after I changed the value of LANGFLOW_CACHE_TYPE to async in the .env file. This is not documented, however. In the .env.example, only memory and redis are mentioned.

@dosu

Thank you for the information. It worked for me.

songjinu commented 2 weeks ago

I use 2 server. can I use the cache when i use LANGFLOW_CACHE_TYPE=async ? How can i use RedisCache ?

Alperemrehas commented 2 weeks ago

I use 2 server. can I use the cache when i use LANGFLOW_CACHE_TYPE=async ? How can i use RedisCache ?

Actually I am not sure this is working with deployed version on a server. Because lately I'm experiencing vertices build endpoint failure which I am connecting to the cache issue. After working several days this specific endpoint just stops. Maybe @ogabrielluiz have idea about this. This maybe a serious bug.