GoogleCloudPlatform / alloydb-python-connector

A Python library for connecting securely to your AlloyDB instances.
Apache License 2.0
25 stars 6 forks source link

(pg8000.exceptions.InterfaceError) on cloud run for AlloyDB connections #322

Closed priyamshah112 closed 1 month ago

priyamshah112 commented 1 month ago

Bug Description

On cloud run with all network steps everything followed the connector is able to make connection with database for 10-15 minutes and returns 200 OK api responses, after that it just throws below error leading to 500 internal server error on cloud run.

Example code (or command)

No response

Stacktrace

sqlalchemy.exc.InterfaceError: (pg8000.exceptions.InterfaceError) connection is closed

at ._send_message ( /usr/local/lib/python3.10/site-packages/pg8000/core.py:767 )
at .send_QUERY ( /usr/local/lib/python3.10/site-packages/pg8000/core.py:679 )
at .execute_simple ( /usr/local/lib/python3.10/site-packages/pg8000/core.py:684 )
at .execute ( /usr/local/lib/python3.10/site-packages/pg8000/dbapi.py:468 )
at .do_execute ( /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py:921 )
at ._exec_single_context ( /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1965 )
at ._handle_dbapi_exception ( /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2339 )
at ._exec_single_context ( /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1984 )
at ._execute_context ( /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1844 )
at ._execute_clauseelement ( /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1635 )
at ._execute_on_connection ( /usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:483 )
at .execute ( /usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1412 )
at .orm_execute_statement ( /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/context.py:293 )
at ._execute_internal ( /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py:2141 )
at .execute ( /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py:2246 )
at ._iter ( /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2846 )
at .one ( /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2797 )
at .scalar ( /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:2824 )
at .count ( /usr/local/lib/python3.10/site-packages/sqlalchemy/orm/query.py:3131 )
at .get_postal_codes ( /app/app/crud.py:17 )
at .read_postal_codes ( /app/app/routers/postal_code.py:25 )
at .run ( /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:859 )
at .run_sync_in_worker_thread ( /usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2177 )
at .run_sync ( /usr/local/lib/python3.10/site-packages/anyio/to_thread.py:56 )
at .run_in_threadpool ( /usr/local/lib/python3.10/site-packages/starlette/concurrency.py:42 )
at .run_endpoint_function ( /usr/local/lib/python3.10/site-packages/fastapi/routing.py:193 )
at .app ( /usr/local/lib/python3.10/site-packages/fastapi/routing.py:278 )
at .app ( /usr/local/lib/python3.10/site-packages/starlette/routing.py:72 )
at .wrapped_app ( /usr/local/lib/python3.10/site-packages/starlette/_exception_handler.py:53 )
at .wrapped_app ( /usr/local/lib/python3.10/site-packages/starlette/_exception_handler.py:64 )
at .app ( /usr/local/lib/python3.10/site-packages/starlette/routing.py:77 )
at .handle ( /usr/local/lib/python3.10/site-packages/starlette/routing.py:297 )
at .app ( /usr/local/lib/python3.10/site-packages/starlette/routing.py:776 )
at .__call__ ( /usr/local/lib/python3.10/site-packages/starlette/routing.py:756 )
at .wrapped_app ( /usr/local/lib/python3.10/site-packages/starlette/_exception_handler.py:53 )
at .wrapped_app ( /usr/local/lib/python3.10/site-packages/starlette/_exception_handler.py:64 )
at .__call__ ( /usr/local/lib/python3.10/site-packages/starlette/middleware/exceptions.py:65 )
at .simple_response ( /usr/local/lib/python3.10/site-packages/starlette/middleware/cors.py:148 )
at .__call__ ( /usr/local/lib/python3.10/site-packages/starlette/middleware/cors.py:93 )
at .__call__ ( /usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py:164 )
at .__call__ ( /usr/local/lib/python3.10/site-packages/starlette/middleware/errors.py:186 )
at .__call__ ( /usr/local/lib/python3.10/site-packages/starlette/applications.py:123 )
at .__call__ ( /usr/local/lib/python3.10/site-packages/fastapi/applications.py:1054 )
at .__call__ ( /usr/local/lib/python3.10/site-packages/uvicorn/middleware/proxy_headers.py:70 )
at .run_asgi ( /usr/local/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py:399 )

Steps to reproduce?

  1. ?
  2. ?
  3. ? ...

Environment

  1. OS type and version: Windows or Ubuntu
  2. Python version: 3.10
  3. AlloyDB Python Connector version: latest

Additional Details

No response

priyamshah112 commented 1 month ago

I am trying to refer following to see if it is a solution https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/pull/1093

TypeError: connect() got an unexpected keyword argument 'refresh_strategy'

from google.cloud.alloydb.connector import Connector, IPTypes

def get_engine(): connector = Connector() DB_USER = os.getenv("POSTGRES_USER") DB_PASS = os.getenv("POSTGRES_PASSWORD") INSTANCE_CONNECTION_NAME = os.getenv("DATABASE_URL") DB_NAME = os.getenv("POSTGRES_DB") connection = connector.connect( INSTANCE_CONNECTION_NAME, "pg8000", user=DB_USER, password=DB_PASS, db=DB_NAME, ip_type=IPTypes.PUBLIC, refresh_strategy="lazy" ) engine = create_engine( "postgresql+pg8000://", creator=lambda: connection, pool_size=10, max_overflow=1, pool_timeout=30, # Timeout for getting a connection from the pool pool_recycle=1080, # Recycle connections after this many seconds
pool_pre_ping=True,
) return engine

enocom commented 1 month ago

Yes, this looks like a case of needing lazy refresh. We'll be porting https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/pull/1093 here soon.

In the meantime, you have two workarounds:

  1. Connect directly using either Direct VPC Egress or the Serverless VPC Access Connector.
  2. Enable Always Allocated CPU to avoid these errors.
enocom commented 1 month ago

Tracking issue is here: https://github.com/GoogleCloudPlatform/alloydb-python-connector/issues/298

priyamshah112 commented 1 month ago

it failed with both the steps step 1 Connect directly using either Direct VPC Egress or the Serverless VPC Access Connector. image

step 2. Enable Always Allocated CPU to avoid these errors.

image

enocom commented 1 month ago

In that case, please show me how you're connecting in Python.

priyamshah112 commented 1 month ago

I have a database.py file with following code

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from .config import settings
from dotenv import load_dotenv
from google.cloud.alloydb.connector import Connector, IPTypes
import pg8000
import os
from dotenv import load_dotenv
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='call-center-analysis.json'

load_dotenv()

def get_engine():
    connector = Connector()
    DB_USER = os.getenv("POSTGRES_USER")
    DB_PASS = os.getenv("POSTGRES_PASSWORD")
    INSTANCE_CONNECTION_NAME = os.getenv("DATABASE_URL")
    DB_NAME = os.getenv("POSTGRES_DB")
    connection = connector.connect(
        INSTANCE_CONNECTION_NAME,
        "pg8000",
        user=DB_USER,
        password=DB_PASS,
        db=DB_NAME,
        ip_type=IPTypes.PUBLIC,
    )
    engine = create_engine(
        "postgresql+pg8000://",
        creator=lambda: connection,
        pool_size=10,
        max_overflow=1,
        pool_timeout=30,  # Timeout for getting a connection from the pool
        pool_recycle=1080,  # Recycle connections after this many seconds   
        pool_pre_ping=True,       
    )
    return engine

engine = get_engine()
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

in my api routes i am using get_db

from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from typing import Optional
from app.database import get_db
from app.crud import get_cities
from app.auth import get_current_user
from app.schemas import CityResponse
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

router = APIRouter()

@router.get("/cities", response_model=CityResponse)
def read_cities(
    db: Session = Depends(get_db),
    page: int = 1,
    size: int = 10,
    query: Optional[str] = Query(None, min_length=1, max_length=5),
    current_user: dict = Depends(get_current_user)
):
    skip = (page - 1) * size
    response = get_cities(db, skip=skip, limit=size, query=query)

    # Log the response
    logger.info(f"Response: {response}")

    return response
enocom commented 1 month ago

The argument passed to creator should create the connections on demand (instead of ahead of time).

Try something like this instead:

import sqlalchemy
from google.cloud.alloydb.connector import Connector, IPTypes
import pg8000

def get_engine(connector: Connector) -> sqlalchemy.engine.base.Engine:
    def getconn() -> pg8000.dbapi.Connection:
        return connector.connect(
            "<INSTANCE_URI>",
            "pg8000",
            user=DB_USER,
            password=DB_PASS,
            db=DB_NAME,
            ip_type=IPTypes.PUBLIC,
        )

    return sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator = getconn,
    )

You'll want to save a reference to the Connector to close it when your app shuts down too.

priyamshah112 commented 1 month ago

With my previous code, it failed with same behavior on compute engine. I have updated your reference code and following is the logger print. So far it is working great on compute engine with the change. Thank you for your quick response.

INFO:sqlalchemy.engine.Engine:select pg_catalog.version()
INFO:sqlalchemy.engine.Engine:[raw sql] ()
INFO:sqlalchemy.engine.Engine:select current_schema()
INFO:sqlalchemy.engine.Engine:[raw sql] ()
INFO:sqlalchemy.engine.Engine:show standard_conforming_strings
INFO:sqlalchemy.engine.Engine:[raw sql] ()
INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT count(*) AS count_1 
FROM (SELECT DISTINCT tm_service.postal_code AS tm_service_postal_code 
FROM tm_service 
WHERE tm_service.postal_code IS NOT NULL) AS anon_1
INFO:sqlalchemy.engine.Engine:[generated in 0.00009s] ()
INFO:sqlalchemy.engine.Engine:SELECT DISTINCT tm_service.postal_code AS tm_service_postal_code 
FROM tm_service 
WHERE tm_service.postal_code IS NOT NULL 
 LIMIT %s OFFSET %s
INFO:sqlalchemy.engine.Engine:[generated in 0.00011s] (10, 0)
INFO:     127.0.0.1:38266 - "GET /api/v1/zip_codes HTTP/1.1" 200 OK
INFO:sqlalchemy.engine.Engine:ROLLBACK
priyamshah112 commented 1 month ago

Closing. As resolved.

enocom commented 1 month ago

Glad to hear it. We'll be shipping lazy refresh soon as well, so if you see problems in Cloud Run, we can discuss in #298.