QuivrHQ / quivr

Open-source RAG Framework for building GenAI Second Brains 🧠 Build productivity assistant (RAG) ⚡️🤖 Chat with your docs (PDF, CSV, ...) & apps using Langchain, GPT 3.5 / 4 turbo, Private, Anthropic, VertexAI, Ollama, LLMs, Groq that you can share with users ! Efficient retrieval augmented generation framework
https://quivr.com
Other
35.92k stars 3.48k forks source link

[Bug]: check_if_is_premium_user error #2976

Open luqijun opened 1 month ago

luqijun commented 1 month ago

What happened?

I got a error when execute celery task check_if_is_premium_user in _celeryworker.py. when i open supabase table editor from http://localhost:54323/project/default/editor, I couldn't open the table subscriptions, and i got the error "Application error: a client-side exception has occurred (see the browser console for more information)."

image

Relevant log output

Exception has occurred: APIError
{'code': 'XX000', 'details': None, 'hint': None, 'message': 'called `Result::unwrap()` on an `Err` value: InvalidPosition'}
  File "/home/Jun/Projects/Python/NLP/quivr/backend/api/quivr_api/celery_worker.py", line 165, in check_if_is_premium_user
    .execute()
     ^^^^^^^^^
postgrest.exceptions.APIError: {'code': 'XX000', 'details': None, 'hint': None, 'message': 'called `Result::unwrap()` on an `Err` value: InvalidPosition'}

Twitter / LinkedIn details

No response

linear[bot] commented 1 month ago

CORE-141 [Bug]: check_if_is_premium_user error

dosubot[bot] commented 1 month ago

@luqijun Hello there! I'm here to help you with bug fixes and answer your questions. Let's tackle this issue together! 🤖

To resolve the error "called Result::unwrap() on an Err value: InvalidPosition" when executing the Celery task check_if_is_premium_user in the file _celeryworker.py and accessing the Supabase table subscriptions, you need to handle the potential errors that can occur when querying the Supabase database. Specifically, you should check if the query results are successful before attempting to use the data.

Here is a modified version of the check_if_is_premium_user function with added error handling:

@celery.task(name="check_if_is_premium_user")
def check_if_is_premium_user():
    supabase = get_supabase_db()
    supabase_db = supabase.db

    paris_tz = timezone("Europe/Paris")
    current_time = datetime.now(paris_tz)
    current_time_str = current_time.strftime("%Y-%m-%d %H:%M:%S.%f")
    logger.debug(f"Current time: {current_time_str}")

    # Define the memoization period (e.g., 1 hour)
    memoization_period = timedelta(hours=1)
    memoization_cutoff = current_time - memoization_period

    try:
        # Fetch all necessary data in bulk
        subscriptions_response = (
            supabase_db.table("subscriptions")
            .select("*")
            .filter("current_period_end", "gt", current_time_str)
            .execute()
        )
        if subscriptions_response.error:
            raise Exception(subscriptions_response.error)
        subscriptions = subscriptions_response.data

        customers_response = supabase_db.table("customers").select("*").execute()
        if customers_response.error:
            raise Exception(customers_response.error)
        customers = customers_response.data

        customer_emails = [customer["email"] for customer in customers]

        # Split customer emails into batches of 50
        email_batches = [
            customer_emails[i : i + 20] for i in range(0, len(customer_emails), 20)
        ]

        users = []
        for email_batch in email_batches:
            batch_users_response = (
                supabase_db.table("users")
                .select("id, email")
                .in_("email", email_batch)
                .execute()
            )
            if batch_users_response.error:
                raise Exception(batch_users_response.error)
            users.extend(batch_users_response.data)

        product_features_response = (
            supabase_db.table("product_to_features").select("*").execute()
        )
        if product_features_response.error:
            raise Exception(product_features_response.error)
        product_features = product_features_response.data

        user_settings_response = (
            supabase_db.table("user_settings").select("*").execute()
        )
        if user_settings_response.error:
            raise Exception(user_settings_response.error)
        user_settings = user_settings_response.data

        # Create lookup dictionaries for faster access
        user_dict = {user["email"]: user["id"] for user in users}
        customer_dict = {customer["id"]: customer for customer in customers}
        product_dict = {
            product["stripe_product_id"]: product for product in product_features
        }
        settings_dict = {setting["user_id"]: setting for setting in user_settings}

        # Process subscriptions and update user settings
        premium_user_ids = set()
        settings_to_upsert = {}
        for sub in subscriptions:
            if sub["attrs"]["status"] != "active":
                continue

            customer = customer_dict.get(sub["customer"])
            if not customer:
                continue

            user_id = user_dict.get(customer["email"])
            if not user_id:
                continue

            current_settings = settings_dict.get(user_id, {})
            last_check = current_settings.get("last_stripe_check")

            # Skip if the user was checked recently
            if last_check and datetime.fromisoformat(last_check) > memoization_cutoff:
                premium_user_ids.add(user_id)
                continue

            user_id = str(user_id)  # Ensure user_id is a string
            premium_user_ids.add(user_id)

            product_id = sub["attrs"]["items"]["data"][0]["plan"]["product"]
            product = product_dict.get(product_id)
            if not product:
                logger.warning(f"No matching product found for subscription: {sub['id']}")
                continue

            settings_to_upsert[user_id] = {
                "user_id": user_id,
                "max_brains": product["max_brains"],
                "max_brain_size": product["max_brain_size"],
                "monthly_chat_credit": product["monthly_chat_credit"],
                "api_access": product["api_access"],
                "models": product["models"],
                "is_premium": True,
                "last_stripe_check": current_time_str,
            }

        # Bulk upsert premium user settings in batches of 10
        settings_list = list(settings_to_upsert.values())
        for i in range(0, len(settings_list), 10):
            batch = settings_list[i : i + 10]
            supabase_db.table("user_settings").upsert(batch).execute()

        # Delete settings for non-premium users in batches of 10
        settings_to_delete = [
            setting["user_id"]
            for setting in user_settings
            if setting["user_id"] not in premium_user_ids and setting.get("is_premium")
        ]
        for i in range(0, len(settings_to_delete), 10):
            batch = settings_to_delete[i : i + 10]
            supabase_db.table("user_settings").delete().in_("user_id", batch).execute()

        logger.info(
            f"Updated {len(settings_to_upsert)} premium users, deleted settings for {len(settings_to_delete)} non-premium users"
        )
        return True

    except Exception as e:
        logger.exception(e)
        return False

This code adds error handling for each database query to ensure that any errors are caught and logged, preventing the unwrap error from occurring [1].

To continue talking to Dosu, mention @dosu.

StanGirard commented 1 month ago

Hi my friend, this is normal as we are also using this code for quivr.com.

In order to remove stripe and not have the errors please add DEACTIVATE_STRIPE=true to your .env file