MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.87k stars 398 forks source link

GCP CloudSQL Connector with asyncpg Pool #1148

Open d1manson opened 4 months ago

d1manson commented 4 months ago

Not sure if i'm better asking this on the cloud sql python connector repo or here, but I'm trying to understand how to effectively call asyncpg.create_pool(...) but make it work with the cloudsql connector.

I can create a single asyncpg-style connection using this:

from google.cloud.sql.connector import Connector

async def init():  
    connector = Connector(enable_iam_auth=True, loop=asyncio.get_event_loop())
    conn =  await connector.connect_async(
                instance_connection_string="my-project:region:db-name",
                driver="asyncpg",
                db="postgres",
                user="something@my-project.iam.gserviceaccount.com")
    # do init stuff...
    await conn.set_type_codec("json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog")
    return conn

but i'm not sure how to do a complete drop in replacement for async.create_pool, which is what we have in the existing codebase; I'm hoping to retain the pool object as is so that the rest of the codebase doesn't need to be modified in light of switching to CloudSQL.

Thanks!

d1manson commented 1 month ago

In case anyone finds it useful, i did actually come back to this and got it working as follows:

from asyncpg import Pool as Pool_original, create_pool as create_pool_original
import inspect
from google.cloud.sql.connector import Connector
import asyncio

async def connect(dsn, **kwargs):
    # The dsn should be a cloudsql instance_connection_string, not a full dsn; we use the name 'dsn' for consistency
    # with the original asyncpg.connect function arguments.

    # you may want to make this Connector a singleton rather than create a new one scoped to this function call
    connector = Connector(enable_iam_auth=True, loop=loop) 

    return await connector.connect_async(
        dsn,
        driver="asyncpg",
        user="SOME_USER_HERE",
        **kwargs
    )

class Pool(Pool_original):

    async def _get_new_connection(self):
        # this function body is copy-pasted from the base class, with just the first line modified
        con = await connect(*self._connect_args, loop=self._loop,
                            connection_class=self._connection_class,
                            record_class=self._record_class,
                            **self._connect_kwargs)

        if self._init is not None:
            try:
                await self._init(con)
            except (Exception, asyncio.CancelledError) as ex:
                # If a user-defined `init` function fails, we don't
                # know if the connection is safe for re-use, hence
                # we close it.  A new connection will be created
                # when `acquire` is called again.
                try:
                    # Use `close()` to close the connection gracefully.
                    # An exception in `init` isn't necessarily caused
                    # by an IO or a protocol error.  close() will
                    # do the necessary cleanup via _release_on_close().
                    await con.close()
                finally:
                    raise ex

        return con

def _borrow_default_kwargs(func, kwargs):
    signature = inspect.signature(func)
    return {
        **{
            k: v.default
            for k, v in signature.parameters.items()
            if v.default is not inspect.Parameter.empty and v.kind == v.KEYWORD_ONLY
        },
        **kwargs,
    }

def create_pool(dsn=None, **kwargs):
    return Pool(dsn, **_borrow_default_kwargs(create_pool_original, kwargs))

Note this would be somewhat simpler if it was possible to provide a custom .connect method for the _get_new_connection method to use - here - rather than having it be hardcoded to use the default connection.connect.