python-trio / triopg

PostgreSQL client for Trio based on asyncpg
Other
45 stars 8 forks source link

Prepared statements are not working #8

Closed serjflint closed 4 years ago

serjflint commented 4 years ago

Code below works normally

async def select(dsn, key: str):
    sql = 'SELECT id, data FROM table WHERE key = $1;'

    async with triopg.connect(dsn) as conn:
        records = await conn.fetch(sql, key)

    return records

And the prepared statement throws an Exception

async def select(dsn, key: str):
    sql = 'SELECT id, data FROM table WHERE key = $1;'

    async with triopg.connect(dsn) as conn:
        stmt = await conn.prepare(sql)
        records = await stmt.fetch(key)

    return records
Exception in default exception handler
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1733, in call_exception_handler
    self.default_exception_handler(context)
  File "/home/.cache/pypoetry/virtualenvs/zkd-I8xA_cm5-py3.8/lib/python3.8/site-packages/trio_asyncio/_async.py", line 44, in default_exception_handler
    raise exception
asyncpg.exceptions.QueryCanceledError: canceling statement due to user request
Traceback (most recent call last):
  File "main.py", line 11, in <module>
    trio_asyncio.run(z.run)
  File "/home/.cache/pypoetry/virtualenvs/zkd-I8xA_cm5-py3.8/lib/python3.8/site-packages/trio_asyncio/_loop.py", line 429, in run
    return trio.run(_run_task, proc, args)
  File "/home/.cache/pypoetry/virtualenvs/zkd-I8xA_cm5-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 1896, in run
    raise runner.main_task_outcome.error
  File "/home/.cache/pypoetry/virtualenvs/zkd-I8xA_cm5-py3.8/lib/python3.8/site-packages/trio_asyncio/_loop.py", line 427, in _run_task
    return await proc(*args)
  File "main.py", line 741, in run
    await self.compare(
  File "main.py", line 601, in compare
    nursery.start_soon(self.compare_worker, args)
  File "/home/.cache/pypoetry/virtualenvs/zkd-I8xA_cm5-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 741, in __aexit__
    raise combined_error_from_nursery
  File "main.py", line 607, in compare_worker
    matches = await self.compare_one(*args)
  File "main.py", line 640, in compare_one
    quick_list = await select(connection, key)
  File "main.py", line 162, in select_quick
    hashes = await stmt.fetch(key)
  File "/home/.cache/pypoetry/virtualenvs/zkd-I8xA_cm5-py3.8/lib/python3.8/site-packages/asyncpg/prepared_stmt.py", line 162, in fetch
    data = await self.__bind_execute(args, 0, timeout)
  File "/home/.cache/pypoetry/virtualenvs/zkd-I8xA_cm5-py3.8/lib/python3.8/site-packages/asyncpg/prepared_stmt.py", line 202, in __bind_execute
    data, status, _ = await protocol.bind_execute(
  File "asyncpg/protocol/protocol.pyx", line 196, in bind_execute
TypeError: trio.run received unrecognized yield message <Future pending cb=[Protocol._on_waiter_completed()]>. Are you trying to use a library written for some other framework like asyncio? That won't work without some kind of compatibility shim.
touilleMan commented 4 years ago

Hi @serjflint !

I guess the trouble come from the fact prepare() returns a statement object (not sure precisely what kind of object is returned) this is not wrapped by trio. See for instance what is done for the transaction() method compared to the generic wrapper (that only wrap the execution of the method and return the value verbatim):

https://github.com/python-trio/triopg/blob/19343c5012d43219999e983d03f31e9bdd9415f1/triopg/_triopg.py#L45-L64

So I guess we should introduce a TrioStatementProxy and mimic what is done for the transaction() method

serjflint commented 4 years ago

@touilleMan I am actually trying to do just that but I have a stack overflow on return from prepare()

class TrioStatementProxy:
    def __init__(self, asyncpg_statement):
        self._asyncpg_statement = asyncpg_statement

    def __getattr__(self, attr):
        target = getattr(self._asyncpg_statement, attr)

        if callable(target):

            @wraps(target)
            @trio_asyncio.aio_as_trio
            async def wrapper(*args, **kwargs):
                return await target(*args, **kwargs)

            # Only generate the function wrapper once per instance
            setattr(self, attr, wrapper)

            return wrapper

        return target

    @trio_asyncio.aio_as_trio
    async def __aenter__(self, *args):
        return await self._asyncpg_statement(*args)

    @_shielded
    @trio_asyncio.aio_as_trio
    async def __aexit__(self, *args):
        return await self._asyncpg_statement.__aexit__(*args)

class TrioConnectionProxy:
    def __init__(self, *args, **kwargs):
        self._asyncpg_create_connection = partial(
            asyncpg.connect, *args, **kwargs
        )
        self._asyncpg_conn = None

    async def prepare(self, *args, **kwargs):
        asyncpg_statement = await trio_asyncio.aio_as_trio(self._asyncpg_conn.prepare(*args, **kwargs))
        return TrioStatementProxy(asyncpg_statement)
serjflint commented 4 years ago

I removed aenter and aexit and it seems to be working.

touilleMan commented 4 years ago

fixed by #9

touilleMan commented 4 years ago

@serjflint aaaaand version 0.50.0 is released ! 😄