MagicStack / asyncpg

A fast PostgreSQL Database Client Library for Python/asyncio.
Apache License 2.0
7k stars 404 forks source link

asyncpg + aiogram is not working #927

Open rive-n opened 2 years ago

rive-n commented 2 years ago
There is some code abstraction coded with aiogram: ```python async def check_rights(message: types.Message): return await bot['db_pool'].fetch_query('SELECT is_admin FROM users WHERE id = $1::varchar(30) and username = $2::varchar(30)', str(message.chat.id), message.from_user.username) @bot_dispatcher.message_handler(check_rights, commands=['add_admin']) async def add_admin(message: types.Message): print('is_admin!') ``` That's how `bot['db_pool']` is created: ```python async def on_startup(): db = Database() bot = Bot(token=token) bot_dispatcher = Dispatcher(bot) try: await db.setup_pool() except asyncpg.exception.InvalidPasswordError: logging.ERROR("Username or password if incorrect") exit(-1) else: bot['db_pool'] = db bot['getMeInfo'] = await bot.get_me() print(await bot['db_pool'].test()) return bot, bot_dispatcher ``` And database object as well: ```python class Database(object): def __init__(self): if environ.get('DEBUG'): self.host, self.port, self.database = '127.0.0.1', 5432, 'test' else: self.host, self.port, self.database = environ.get('PG_HOST', 'postgres'), environ.get('PG_PORT', 5432), environ.get("PG_DATABASE") @property def get_pool(self): return self.pool async def setup_pool(self): self.pool = await create_pool( user = environ.get('PG_USERNAME'), password = environ.get('PG_PASSWORD'), host = self.host, port = self.port, database = self.database ) async def fetch_query(self, query: str, *args): async with self.pool.acquire() as connection: async with connection.transaction(): return await connection.fetchval(query, *args) async def test(self): return await self.fetch_query("select 2 ^ $1", 1) ``` Well if I am creating event loop like this: ```python loop = new_event_loop() bot, bot_dispatcher = loop.run_until_complete(on_startup()) ``` And starting bot like this: ```python if __name__ == "__main__": print(f"[+] Bot started with username: {bot['getMeInfo'].username}") executor.start_polling(bot_dispatcher, skip_updates=True) ``` I can't execute POOL methods, for example: ```python async def check_rights(message: types.Message): return await bot['db_pool'].fetch_query('SELECT is_admin FROM users WHERE id = $1::varchar(30) and username = $2::varchar(30)', str(message.chat.id), message.from_user.username) ``` Error trace: ``` Task exception was never retrieved future: exception=InterfaceError('cannot perform operation: another operation is in progress')> Traceback (most recent call last): File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/dispatcher.py", line 415, in _process_polling_updates for responses in itertools.chain.from_iterable(await self.process_updates(updates, fast)): File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/dispatcher.py", line 235, in process_updates return await asyncio.gather(*tasks) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/handler.py", line 116, in notify response = await handler_obj.handler(*args, **partial_data) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/dispatcher.py", line 256, in process_update return await self.message_handlers.notify(update.message) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/handler.py", line 107, in notify data.update(await check_filters(handler_obj.filters, args)) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/filters/filters.py", line 72, in check_filters f = await execute_filter(filter_, args) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/aiogram/dispatcher/filters/filters.py", line 56, in execute_filter return await filter_.filter(*args, **filter_.kwargs) File "__main__.py", line 65, in check_rights return await bot['db_pool'].fetch_query('SELECT is_admin FROM users WHERE id = $1::varchar(30) and username = $2::varchar(30)', File "/Users/riven/Desktop/Projects/botname/src/database.py", line 33, in fetch_query return await connection.fetchval(query, *args) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 993, in __aexit__ await self.pool.release(con) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 867, in release return await asyncio.shield(ch.release(timeout)) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 224, in release raise ex File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/pool.py", line 214, in release await self._con.reset(timeout=budget) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/connection.py", line 1367, in reset await self.execute(reset_query, timeout=timeout) File "/Users/riven/Desktop/Projects/botname/src/venv/lib/python3.8/site-packages/asyncpg/connection.py", line 318, in execute return await self._protocol.query(query, timeout) File "asyncpg/protocol/protocol.pyx", line 323, in query File "asyncpg/protocol/protocol.pyx", line 707, in asyncpg.protocol.protocol.BaseProtocol._check_state asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress ``` `cannot perform operation: another operation is in progress` - Which one? I am doing only 1 request to database 0-o
rive-n commented 2 years ago

I found a solution, but that's really strange one...

Before of starting pool: executor.start_polling(bot_dispatcher, skip_updates=True) we need to configure Database object like this:

loop = get_event_loop()
db = Database(loop)
bot['db_pool'] = db

And Database object:

class Database(object):
    def __init__(self, loop):
        if environ.get('DEBUG'):
            self.host, self.port, self.database = '127.0.0.1', 5432, 'test'
        else:
            self.host, self.port, self.database = environ.get('PG_HOST', 'postgres'), environ.get('PG_PORT', 5432), environ.get("PG_DATABASE")

        self.pool = loop.run_until_complete(create_pool(
                user = environ.get('PG_USERNAME'),
                password = environ.get('PG_PASSWORD'),
                host = self.host, port = self.port,
                database = self.database
            ))

This part is pretty strange for me:

loop.run_until_complete(create_pool(
                user = environ.get('PG_USERNAME'),
                password = environ.get('PG_PASSWORD'),
                host = self.host, port = self.port,
                database = self.database
            ))

Why we should create pool via loop object?