MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
6.99k stars 404 forks source link

Asyncpg.pool creates more connections than its max_size #1107

Open no-to-mediocrity opened 11 months ago

no-to-mediocrity commented 11 months ago

I have a singleton for a Database entity, which is then used to write some primitive data. When I run the write function 1000 times concurrently using asyncio.gather(), the database reports that there is more connections than the max_size of the asyncpg.pool. For example, when I was testing there were 857 active db connections, but only 62 active pool connections. No other clients/operations were running during the test. When I use uvloop to do the same thing, it just crashes with ConnectionResetError: [Errno 54] Connection reset by peer if I try to run more tasks than the size of the pool.

Is this a normal pool behavior?

I use code below (the write function is simplified though):

The database code:

  class Database:
      _instance = None
      _pool = None
      db_params = { 
                  'host': os.getenv('DATABASE_HOST'),
                  'port': os.getenv('DATABASE_PORT'),
                  'database': os.getenv('DATABASE_NAME'),
                  'user': os.getenv('DATABASE_USER'),
                  'password': os.getenv('DATABASE_PASSWORD')
              }

      def __new__(cls, *args, **kwargs):
          if cls._instance is None:
              cls._instance = super(Database, cls).__new__(cls)
              #print(cls._instance)
          return cls._instance

      @classmethod
      async def get_pool(cls):
          if cls._pool is None:
              cls._pool = await asyncpg.create_pool(**cls.db_params, min_size=1, max_size=150)
              #print(cls._pool)
          return cls._pool

    @classmethod
    async def write(cls, result):
            pool = await cls.get_pool()
            try:
                    async with pool.acquire() as connection:
                        result = await connection.execute('''
                            INSERT INTO tables.results(
                                result
                            ) VALUES($1)
                        ''', result)
                        return
            except Exception as e:
                raise e

The demo write code

async def fake_result(i):
    print(f'generating fake result {i}')
    await db.record_result(i)
    return

async def run_functions_concurrently():
   tasks = [fake_result(i) for i in range(1000)]
   await asyncio.gather(*tasks)

def main():
    asyncio.run(run_functions_concurrently())

if __name__ == "__main__":
    main()