GoogleCloudPlatform / cloud-sql-python-connector

A Python library for connecting securely to your Cloud SQL instances.
Apache License 2.0
278 stars 65 forks source link

Update docs to showcase cleanup of `Connector` object #914

Open lauraseidler opened 9 months ago

lauraseidler commented 9 months ago

Bug Description

We use the connector with IAM Auth + Cloud SQL for Postgres. It generally works okay, but we are occasionally seeing errors on shut down of our application server that look like this:

ERROR:google.cloud.sql.connector.instance:['XXX']: An error occurred while performing refresh. Scheduling another refresh attempt immediately

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/google/cloud/sql/connector/instance.py", line 376, in _refresh_task
    refresh_data = await refresh_task
  File "/usr/local/lib/python3.10/site-packages/google/cloud/sql/connector/instance.py", line 311, in _perform_refresh
    metadata = await metadata_task
  File "/usr/local/lib/python3.10/site-packages/google/cloud/sql/connector/refresh_utils.py", line 106, in _get_metadata
    resp = await client_session.get(url, headers=headers, raise_for_status=True)
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client.py", line 586, in _request
    await resp.start(conn)
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client_reqrep.py", line 905, in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.10/site-packages/aiohttp/streams.py", line 616, in read
    await self._waiter
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe

ERROR:asyncio:Task exception was never retrieved

future: <Task finished name='Task-507' coro=<Instance._schedule_refresh.<locals>._refresh_task() done, defined at /usr/local/lib/python3.10/site-packages/google/cloud/sql/connector/instance.py:365> exception=ClientOSError(32, 'Broken pipe')>

The exception itself may vary - mostly aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe, but we've also seen aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected.

To me, this looks like an async task is not checked for exceptions, even though at least the one in the referenced line looks okay: https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/blob/406b383728b9aabda77d6fd4326ff7ce8d955557/google/cloud/sql/connector/instance.py#L376

Since this means these errors are only logged when the application shuts down, this makes it somewhat hard to debug what's causing these connection issues, and if they are causing actual issues or the connection is re-established successfully.

Example code (or command)

No response

Stacktrace

No response

Steps to reproduce?

  1. Use Python connector as documented here https://cloud.google.com/sql/docs/postgres/iam-logins#log-in-with-automatic
  2. Run SQL queries for a while (we're doing it within Cloud Run, and it's only reproducible under load, not locally)
  3. Shut down application

Environment

  1. OS type and version: Debian 11.7 (python:3.10.11-slim docker image)
  2. Python version: 3.10.11
  3. Cloud SQL Python Connector version: 1.4.3

Additional Details

No response

jackwotherspoon commented 9 months ago

Hi @lauraseidler, thanks for opening an issue on the Cloud SQL Python Connector šŸ˜„

Let me see if I am understanding the issue correctly...

we are occasionally seeing errors on shut down of our application server

So all the queries are running successfully without any errors, you are only seeing the errors surfaced when the application shuts down? This would probably hint at the Connector not cleaning up its resources gracefully, hence the tasks not being cancelled or awaited. There are two ways to explicitly cleanup resources and make sure the Connector exits gracefully.

  1. You can call connector.close() yourself on shutdown.

Details in our README:

image
  1. You can leverage using the Connector as a context manager for it to clean up itself.

Details in our README:

# initialize Cloud SQL Python Connector as context manager
with Connector() as connector:

When you say "our application shuts down" can you provide me a bit more details into what this looks like specifically for your use-case, do you just mean when Cloud Run scales down? This will be useful for me to try and reproduce the issue you are seeing?

Let me know if this helps and if it does I can update the sample or documentation as needed. šŸ˜„

lauraseidler commented 9 months ago

Hi @jackwotherspoon,

I think we're currently not explicitly (or implicitly) closing the connector object, only the connection itself. I hadn't really checked the README that far (my bad), as the first part looked identical to the GCP documentation - but that one doesn't mention this part, so we never included it. It sounds like this might indeed be the issue and it would make sense to me, so I will try and see if it changes things and report back, thanks!

When you say "our application shuts down" can you provide me a bit more details into what this looks like specifically for your use-case, do you just mean when Cloud Run scales down? This will be useful for me to try and reproduce the issue you are seeing?

Yes, when Cloud Run scales down. This is especially noticeable when we roll out a new version and the old version has been running for a while, and errors have "accumulated" over multiple instances that get scaled down in rapid succession.

jackwotherspoon commented 9 months ago

Hi @lauraseidler, let me know if closing the Connector resolves the error messages you were previously seeing. šŸ‘

as the first part looked identical to the GCP documentation

If closing the Connector does resolve the issue I will update this issue to track updating the code sample used in the docs to include closing the Connector so that this is not encountered in the future by other users šŸ˜„

NickNaskida commented 9 months ago

@jackwotherspoon Hello, I am having similar issues with my FastAPI application.

Task exception was never retrieved future: <Task finished name='Task-6523' coro=<Instance._schedule_refresh.._refresh_task() done, defined at /usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py:378> exception=TimeoutError()> Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py", line 389, in _refresh_task refresh_data = await refresh_task ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py", line 313, in _perform_refresh metadata = await metadata_task ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py", line 103, in _get_metadata resp = await client_session.get(url, headers=headers, raise_for_status=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 467, in _request with timer: File "/usr/local/lib/python3.11/site-packages/aiohttp/helpers.py", line 721, in exit raise asyncio.TimeoutError from None TimeoutError

and

[2023-12-01 07:07:10,627] ERROR logger=asyncio module=base_events func=default_exception_handler() L1771 message=Task exception was never retrieved future: <Task finished name='Task-7093' coro=<Instance._schedule_refresh.._refresh_task() done, defined at /usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py:378> exception=RetryError(<Future at 0x3e8b0c830a10 state=finished raised SSLError>)> Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py", line 389, in _refresh_task refresh_data = await refresh_task ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/instance.py", line 313, in _perform_refresh metadata = await metadata_task ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py", line 103, in _get_metadata resp = await client_session.get(url, headers=headers, raise_for_status=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 467, in _request with timer: File "/usr/local/lib/python3.11/site-packages/aiohttp/helpers.py", line 721, in exit raise asyncio.TimeoutError from None TimeoutError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 467, in _make_request self._validate_conn(conn) File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 1096, in _validate_conn conn.connect() File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 642, in connect sock_and_verified = _ssl_wrap_socket_and_match_hostname( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/connection.py", line 782, in _ssl_wrap_socket_and_match_hostname ssl_sock = ssl_wrapsocket( ^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/util/ssl.py", line 470, in ssl_wrap_socket ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls, serverhostname) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/util/ssl.py", line 514, in _ssl_wrap_socket_impl return ssl_context.wrap_socket(sock, server_hostname=server_hostname) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/ssl.py", line 517, in wrap_socket return self.sslsocket_class._create( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/ssl.py", line 1108, in _create self.do_handshake() File "/usr/local/lib/python3.11/ssl.py", line 1379, in do_handshake self._sslobj.do_handshake() ssl.SSLEOFError: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1006) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 790, in urlopen response = self._make_request( ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 491, in _make_request raise new_e urllib3.exceptions.SSLError: [SSL: UNEXPECTED_EOF_WHILE_READING] EOF occurred in violation of protocol (_ssl.c:1006) The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/requests/adapters.py", line 486, in send resp = conn.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 844, in urlopen

and

[2023-12-01 05:56:47,122] ERROR logger=asyncio module=base_events func=default_exception_handler() L1771 message=Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x3e8b0d31dbd0>

Here is my code for db connector and session:

from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio.session import AsyncSession

import asyncpg
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.sql.connector import Connector, create_async_connector

from src.core.config import settings

async def init_connection_pool(connector: Connector) -> AsyncEngine:
    async def getconn() -> asyncpg.Connection:
        conn: asyncpg.Connection = await connector.connect_async(
            settings.POSTGRES_CONN_NAME,
            "asyncpg",
            user=settings.POSTGRES_USER,
            password=settings.POSTGRES_PASSWORD,
            db=settings.POSTGRES_DB,
        )
        return conn

    pool = create_async_engine(
        "postgresql+asyncpg://",
        async_creator=getconn,
        pool_size=20,
        max_overflow=10,
        pool_timeout=10,
        pool_recycle=1200,
    )
    return pool

async def get_session():
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()

    # initialize connection pool
    engine = await init_connection_pool(connector)

    async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

I am running this app on Cloud Run and receive a lot of these errors. Is there any way to get rid of them?

jackwotherspoon commented 9 months ago

@NickNaskida Yes it seems you are running into the same issue where the Connector is not cleaning itself up properly.

To close the Connector properly when configured for an async driver you can do one of two things.

  1. Call connector.close_async() (when instantiating Connector via create_async_connector)

    async def get_session():
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()
    
    # initialize connection pool
    engine = await init_connection_pool(connector)
    
    async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()
    # explicitly close connector
    await connector.close_async()
  2. Use Connector as async context manager to have implicit clean up

    async def get_session():
    # initialize Connector as async context manager
    loop = asyncio.get_running_loop()
    async with Connector(loop=loop) as connector:
        # initialize connection pool
        engine = await init_connection_pool(connector)
    
        async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
        async with async_session() as session:
            try:
                yield session
            finally:
                await session.close()
jackwotherspoon commented 9 months ago

I've updated this issue to now reflect updating the code samples to properly document cleaning up the Connector object. Please provide additional details if the above comments turn out not to resolve the underlying issues.

NickNaskida commented 9 months ago

@jackwotherspoon thanks for the quick response. I applied your changes yesterday and some of the issues were resolved, however, this error was logged out today

[2023-12-02 08:00:11,804] ERROR logger=asyncio module=base_events func=default_exception_handler() L1771 message=Task exception was never retrieved future: <Task finished name='Task-2881' coro=<_get_ephemeral() done, defined at /usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py:128> exception=ClientOSError(1, '[SSL: APPLICATION_DATA_AFTER_CLOSE_NOTIFY] application data after close notify (_ssl.c:2706)')> Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/google/cloud/sql/connector/refresh_utils.py", line 201, in _get_ephemeral resp = await client_session.post( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/aiohttp/client.py", line 560, in _request await resp.start(conn) File "/usr/local/lib/python3.11/site-packages/aiohttp/client_reqrep.py", line 899, in start message, payload = await protocol.read() # type: ignore[union-attr] ^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/aiohttp/streams.py", line 616, in read await self._waiter aiohttp.client_exceptions.ClientOSError: [Errno 1] [SSL: APPLICATION_DATA_AFTER_CLOSE_NOTIFY] application data after close notify (_ssl.c:2706)

My aiohttp version is 3.8.5. Let me know if you need any more details


UPDATE: I upgraded aiohttp to version 3.9.1 and it seems that issue has gone away. Will report if it will appear in next days

NickNaskida commented 9 months ago

Hey @jackwotherspoon, so the issue that I posted above still exists even after updating aiohttp. I use the first approach that you suggested above (connector.close_async())

I think this issue is somehow also related to too many idle connections on my database. I currently have this problem that I didn't manage to solve. Because of this I usually get this error: remaining connection slots are reserved for non-replication superuser connections.

my engine config

pool = create_async_engine(
    "postgresql+asyncpg://",
    async_creator=getconn,
    pool_size=5,
    max_overflow=10,
    pool_timeout=10,
    pool_recycle=1200,
)
return pool

I believe this happens because the get_session function is called on every request in my app, and because of it engine is recreated every time:

async def get_session():
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()

    # initialize connection pool
    engine = await init_connection_pool(connector)

    async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

    # explicitly close connector
    await connector.close_async()
jackwotherspoon commented 9 months ago

I usually get this error: remaining connection slots are reserved for non-replication superuser connections

@NickNaskida The error you are seeing is indeed most likely you are hitting the max number of idle connections allowed by Cloud SQL. This is normally due to confusion around the use of connection pooling and it not being properly configured as you pointed out. This is most likely no longer related to the clean up of the Connector object and because of this I recommend we open a separate issue on this repo to discuss further if need be. This will allow others to find a solution and not get confused šŸ˜„

I believe this happens because the get_session function is called on every request in my app, and because of it engine is recreated every time.

You would be correct, sorry I should have caught that previously but my FastAPI knowledge is limited. If you are calling get_session on every request and creating a new engine then you aren't getting the actual benefits of connection pooling. You want to cache your connection pool engine and share it across requests/sessions so that connections can be re-used and pooled properly. Your current approach is a 1:1 request to connection pool engine mapping which would be why you are hitting the max number of connections as shown by the error you are getting. To fix this you will want to move the Connector and engine initialization out of the get_session and maybe set them as global vars.

A couple tips for Cloud Run to optimize performance with the Cloud SQL Connectors is to lazy init the database engine (past issue on this) as well as set your Cloud Run service to "always on" (picture below) as the connector runs background tasks that can be affected negatively by cold starts.

image

The lazy init strategy works really with FastAPI's lifespan event šŸ¤ž :

from contextlib import asynccontextmanager

from fastapi import FastAPI
from sqlalchemy.orm import sessionmaker

# global engine variable to be shared across sessions
engine = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global engine
    # initialize Connector object for connections to Cloud SQL
    connector = await create_async_connector()
    # init the engine
    engine = await init_connection_pool(connector)
    yield
    # clean up the Cloud SQL Connector
    await connector.close_async()

app = FastAPI(lifespan=lifespan)

async def get_session():
    global engine
    async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

This will initialize an engine and connector for the lifespan of the FastAPI app.

NickNaskida commented 9 months ago

@jackwotherspoon That was it! Thank you very much!

PS. You should definitely add this to README & docs because I searched a lot of stuff on the web during the weekend and didn't find anything like this.

jackwotherspoon commented 7 months ago

Improved the connector.close method in https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/pull/985 to be called upon garbage collection, should help resolve these errors when users don't explicitly call close.

nioncode commented 1 day ago

Seems like the garbage collection improvement by @jackwotherspoon does not fix the issue (at least not for us). We had to close the connector manually and do it via atexit, since we did not find a good place to use it as a context manager within our flask + sqlalchemy setup:

        connector = Connector()
        atexit.register(lambda: connector.close())
jackwotherspoon commented 1 day ago

@nioncode We actually reverted most of the garbage collection PR in https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/pull/1010 as there seemed to be a race condition that could cause the cleanup to hang and re-opened #1011 to re-look at properly cleaning up on garbage collection