squeaky-pl / japronto

Screaming-fast Python 3.5+ HTTP toolkit integrated with pipelining HTTP server based on uvloop and picohttpparser.
MIT License
8.61k stars 581 forks source link

Async issue when using with asyncpg pools #81

Open Izacht13 opened 7 years ago

Izacht13 commented 7 years ago

Example code from https://magicstack.github.io/asyncpg/current/usage.html#connection-pools Only substituting aiohttp web for japronto.

Trace:

Traceback (most recent call last):
  File "main.py", line 26, in handle
    text="2 ^ {} is {}".format(power, result))
  File "/usr/lib/python3.6/site-packages/asyncpg/pool.py", line 252, in __aexit__
    await self.pool.release(con)
  File "/usr/lib/python3.6/site-packages/asyncpg/pool.py", line 184, in release
    await connection.reset()
  File "/usr/lib/python3.6/site-packages/asyncpg/connection.py", line 411, in reset
    ''')
  File "/usr/lib/python3.6/site-packages/asyncpg/connection.py", line 170, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 249, in query (asyncpg/protocol/protocol.c:57280)
  File "asyncpg/protocol/protocol.pyx", line 354, in asyncpg.protocol.protocol.BaseProtocol._ensure_clear_state (asyncpg/protocol/protocol.c:59586)
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

I actually ran into this issue when I tried to add asyncpg to my existing project, boiled it down to this text case and found it was a root issue.

Here is the offending code:

import asyncio
import asyncpg
import japronto

app = japronto.Application()

loop = asyncio.get_event_loop()
pool = loop.run_until_complete(
    asyncpg.create_pool(
        database='postgres',
    user='postgres'
    )
)

async def handle(request):
    power = int(request.match_dict.get('power', 10))
    async with pool.acquire() as connection:
        async with connection.transaction():
            result = await connection.fetchval('select 2 ^ $1', power)
            return request.Response(
                text="2 ^ {} is {}".format(power, result))

app.router.add_route('/{power}', handle)
app.router.add_route('/', handle)
app.run(debug=True)

You'll see I had to rely on the closure because japranto's app object doesn't support item assignment.

jochumdev commented 6 years ago

I found a way to get this working:

import asyncio
import asyncpg
import japronto

async def connect_db_pool(app):
    asyncio.set_event_loop(app.loop)
    db_pool = await asyncpg.create_pool(
        host='HOST',
        database='DB',
        user='USER',
        password='PASSWORD',
    )

    app.extend_request(lambda x: db_pool, name='db_pool', property=True)

async def handle(request):
    power = int(request.match_dict.get('power', 10))
    async with request.db_pool.acquire() as connection:
        async with connection.transaction():
            result = await connection.fetchval('select 2 ^ $1', power)
            return request.Response(
                text="2 ^ {} is {}".format(power, result))

app = japronto.Application()
app.loop.run_until_complete(connect_db_pool(app))

app.router.add_route('/{power}', handle)
app.router.add_route('/', handle)
app.run(debug=True)