MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.97k stars 403 forks source link

Long running pool connections go stale and can not be used #704

Open ssolari opened 3 years ago

ssolari commented 3 years ago

I suspect this is an issue related to not fully understanding how to properly clean up pool connections, but it seems there are other issues and users that could benefit from helping figure this out. Other similar issues are #519 and #696.

My question is really what is a correct method for refreshing/cleaning up connections or the whole pool periodically? And also to ensure that on some sort of connection error we can re-setup the pool entirely so we have a robust continuous db connection.

We initially tried using pool.close() wrapped in wait_for, but it did not work. Since the docs say It is advisable to use asyncio.wait_for() to set a timeout. and close() now waits until all pool connections are released before closing them and the pool, however in wait_for The function will wait until the future is actually cancelled, so the total wait time may exceed the timeout. So the thought was maybe there was no cancellation happenening and close was getting infinitely stuck.

As a backup, we tried to simply hard terminate all connections periodically and then re-setup the db pool. I thought this should surely work, but we still eventually hang all connections. Below is code that was attempting to robustly terminate the pool and then re-set it up. The code works fine initially to connect to DB and run queries, but over time, the processes using this object simply can not access the database, there are no errors, there are simply no more operations performed on the DB. All other async functions that are doing other operations are running fine (e.g. connecting to external apis), but no operations on the DB are being performed.

The thought is that something must be hanging and the connections are stale somehow, but awaits are not returning nor failing, as a result all DB operations are just queued?

Really just trying to figure out how to: 1) create a connection pool to be used in an effectively infinitely long running process 2) be able to fully close down the pool and refresh it if errors or connection failures are detected (or just do this periodically)


import asyncio
import asyncpg
import logging
import random
import time
import traceback

class AsyncRedshiftDbConnect(object):

    def __init__(self,
                 loop = None,
                 endpoint: str = '',
                 database_name: str = '',
                 port: str = '',
                 username: str = '',
                 password: str = '',
                 pool_size: int=1,
                 reset_time: int=2*3600,
                 ):

        if loop:
            self.loop = loop
        else:
            self.loop = asyncio.get_event_loop()

        self._endpoint = endpoint
        self._port = port
        self._dbname = database_name
        self._user = username
        self._pass = password

        self._db_pool = None
        self._pool_size = pool_size
        assert isinstance(self._pool_size, int)
        assert self._pool_size >= 1 and self._pool_size <= 50
        self._reset_run = 0
        self._reset_time = reset_time
        self._pool_is_resetting = False
        self._closing_pool = False

        # kick off checks for the connection and setup initial connection
        self.loop.call_soon(lambda: asyncio.ensure_future(self.maintain_connection()))
        self.reset_db_pool()

    def is_connection_available(self) -> bool:

        if self._closing_pool or self._pool_is_resetting or not self._db_pool:
            return False
        else:
            return True

    def reset_db_pool(self, delay: int=0):
        # non-async function to schedule the reset db pool
        if not delay:
            self.loop.call_soon(lambda: asyncio.ensure_future(self.async_reset_db_pool()))
        else:
            self.loop.call_later(delay, lambda: asyncio.ensure_future(self.async_reset_db_pool()))

    async def async_reset_db_pool(self):
        """
        Async function to try to reset the database pool of connections with flags for checking status.
        :return:
        """

        if self._pool_is_resetting:
            return
        self._pool_is_resetting = True

        try:
            self._reset_run += 1

            min_size = self._pool_size
            max_size = self._pool_size

            await self._close_pool()
            sttime = time.time()
            self._db_pool = await asyncpg.create_pool(user=self._user, password=self._pass,
                                                      database=self._dbname, host=self._endpoint, port=self._port,
                                                      min_size=min_size, max_size=max_size, loop=self.loop)
            logging.info(f'Connection pool size={min_size} created in {time.time()-sttime:0.4f} seconds')

            # reset the pool every hour
            self.reset_db_pool(self._reset_time + random.randint(1, 100))
        except asyncpg.exceptions.TooManyConnectionsError:
            logging.error(f'RedshiftDB maximum connection (500) limit error being reached')
        except:
            self.reset_db_pool(10)
        self._closing_pool = False
        self._pool_is_resetting = False

    async def _close_pool(self):

        if isinstance(self._db_pool, asyncpg.pool.Pool):
            self._closing_pool = True
            # await asyncio.wait_for(self._db_pool.close(), timeout=10)
            try:
                self._db_pool.terminate()
            except:
                logging.error(traceback.format_exc())

        self._closing_pool = False
        self._db_pool = None

    async def _pool_fetch(self, query: str, *args, raise_exception: bool=False):
        """
        Wrapper to perform a postgres fetch operation using the connection pool.

        :param query:
        :param args:
        :param raise_exception:
        :return:
        """

        try:
            if not self.is_connection_available():
                for _ in range(30):
                    await asyncio.sleep(0.1)
                    if self.is_connection_available():
                        break
            async with self._db_pool.acquire() as con:
                return await con.fetch(query, *args)
        except asyncpg.UndefinedTableError:
            return []
        except:
            if raise_exception:
                raise
            else:
                self.reset_db_pool()

    async def _pool_execute(self, query: str, *args, raise_exception: bool=False):
        """
        Wrapper to perform a postgres execute operation using pool connection.

        :param query:
        :param args:
        :param raise_exception:
        :return:
        """

        try:
            if not self.is_connection_available():
                for _ in range(30):
                    await asyncio.sleep(0.1)
                    if self.is_connection_available():
                        break
            async with self._db_pool.acquire() as con:
                return await con.execute(query, *args)
        except:
            if raise_exception:
                raise
            else:
                self.reset_db_pool()

    async def maintain_connection(self):
        """
        A continuous check of the connection to the database.   In theory this should suffice to keep the connection
        open by resetting it if the tables can not be queried.

        :return:
        """

        query = 'SELECT 1 from check_table'
        result = await self._pool_fetch(query)
        if not result:
            self.reset_db_pool()
        check_delay = 140 + random.random()
        self.loop.call_later(check_delay, lambda: asyncio.ensure_future(self.maintain_connection()))
lazitski-aliaksei commented 3 years ago

Have you considered to use Pool.expire_connections?