python-gino / gino

GINO Is Not ORM - a Python asyncio ORM on SQLAlchemy core.
https://python-gino.org/
Other
2.68k stars 150 forks source link

asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress #715

Open mpdevilleres opened 4 years ago

mpdevilleres commented 4 years ago

Description

I am trying to create tests for a fast_api websocket endpoint as shown with below snippet.

import asyncio

import pytest
from sqlalchemy import (
    Column,
    Text
)
from gino.ext.starlette import Gino
from starlette.websockets import WebSocketDisconnect
from fastapi import Depends, FastAPI, WebSocket
from fastapi.testclient import TestClient
from fastapi import status

db = Gino(
    dsn='postgresql://postgres:@0.0.0.0:5432/internal',
    pool_min_size=1,
    pool_max_size=16,
    echo=False,
    ssl=None,
    use_connection_for_request=True,
    retry_limit=1,
    retry_interval=1,
)

app = FastAPI()
db.init_app(app)

class User(db.Model):
    __tablename__ = "client_user"
    __table_args__ = {"schema": 'internal'}
    alias = "client_user"

    username = Column(Text, nullable=False)
    password = Column(Text)

async def get_current_user(websocket: WebSocket):
    # for simplicity sake username is passed to sec-websocket-protocol instead of JWT
    username = websocket.headers.get('sec-websocket-protocol')

    # database content is
    # admin/pass
    user = await User.query.where(User.username == username).gino.first()
    return user

@app.websocket("/websocket")
async def websocket_endpoint(
        websocket: WebSocket,
        user: User = Depends(get_current_user),
):
    if user is None:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    await websocket.accept()
    print(f'User: {user.username} connected')

@pytest.fixture(scope="session")
def event_loop():
    """
    This is to make the asyncio event loop shared for the whole test session, otherwise
    it will be recreated for each test which will prevent using the test_db fixture.
    https://github.com/FactoryBoy/factory_boy/issues/679
    https://stackoverflow.com/a/56238383
    """
    loop = asyncio.get_event_loop()
    yield loop
    loop.close()

@pytest.fixture
def client():
    """Get client for test app"""

    with TestClient(app) as client:
        yield client

def test_invalid_user(client):
    with pytest.raises(WebSocketDisconnect):
        with client.websocket_connect("/websocket", ['fake_user']):
            assert True

def test_valid_user(client):
    with client.websocket_connect("/websocket", ['admin']):
        assert True

What I Did

I run test with below

pytest bug_report.py

here's the traceback

client = <starlette.testclient.TestClient object at 0x10a33d7c0>

    def test_valid_user(client):
>       with client.websocket_connect("/websocket", ['admin']):

bug_report.py:89: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
venv/lib/python3.8/site-packages/starlette/testclient.py:444: in websocket_connect
    super().request("GET", url, **kwargs)
venv/lib/python3.8/site-packages/requests/sessions.py:530: in request
    resp = self.send(prep, **send_kwargs)
venv/lib/python3.8/site-packages/requests/sessions.py:643: in send
    r = adapter.send(request, **kwargs)
venv/lib/python3.8/site-packages/starlette/testclient.py:145: in send
    session = WebSocketTestSession(self.app, scope)
venv/lib/python3.8/site-packages/starlette/testclient.py:277: in __init__
    message = self.receive()
venv/lib/python3.8/site-packages/starlette/testclient.py:339: in receive
    raise message
venv/lib/python3.8/site-packages/starlette/testclient.py:300: in _run
    self._loop.run_until_complete(self.app(scope, receive, send))
../.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py:616: in run_until_complete
    return future.result()
venv/lib/python3.8/site-packages/fastapi/applications.py:181: in __call__
    await super().__call__(scope, receive, send)
venv/lib/python3.8/site-packages/starlette/applications.py:102: in __call__
    await self.middleware_stack(scope, receive, send)
venv/lib/python3.8/site-packages/starlette/middleware/errors.py:146: in __call__
    await self.app(scope, receive, send)
venv/lib/python3.8/site-packages/gino_starlette.py:86: in __call__
    await self.app(scope, receive, send)
venv/lib/python3.8/site-packages/starlette/exceptions.py:58: in __call__
    await self.app(scope, receive, send)
venv/lib/python3.8/site-packages/starlette/routing.py:550: in __call__
    await route.handle(scope, receive, send)
venv/lib/python3.8/site-packages/starlette/routing.py:283: in handle
    await self.app(scope, receive, send)
venv/lib/python3.8/site-packages/starlette/routing.py:57: in app
    await func(session)
venv/lib/python3.8/site-packages/fastapi/routing.py:232: in app
    solved_result = await solve_dependencies(
venv/lib/python3.8/site-packages/fastapi/dependencies/utils.py:539: in solve_dependencies
    solved = await call(**sub_values)
bug_report.py:44: in get_current_user
    user = await User.query.where(User.username == username).gino.first()
venv/lib/python3.8/site-packages/gino/api.py:137: in first
    return await self._query.bind.first(self._query, *multiparams, **params)
venv/lib/python3.8/site-packages/gino/engine.py:748: in first
    return await conn.first(clause, *multiparams, **params)
venv/lib/python3.8/site-packages/gino/engine.py:147: in __aexit__
    await conn.release()
venv/lib/python3.8/site-packages/gino/engine.py:279: in release
    await dbapi_conn.release(True)
venv/lib/python3.8/site-packages/gino/engine.py:47: in release
    return await self._release()
venv/lib/python3.8/site-packages/gino/engine.py:83: in _release
    await self._pool.release(conn)
venv/lib/python3.8/site-packages/gino/dialects/asyncpg.py:232: in release
    await self._pool.release(conn)
venv/lib/python3.8/site-packages/asyncpg/pool.py:654: in release
    return await asyncio.shield(ch.release(timeout))
venv/lib/python3.8/site-packages/asyncpg/pool.py:216: in release
    raise ex
venv/lib/python3.8/site-packages/asyncpg/pool.py:206: in release
    await self._con.reset(timeout=budget)
venv/lib/python3.8/site-packages/asyncpg/connection.py:1137: in reset
    await self.execute(reset_query, timeout=timeout)
venv/lib/python3.8/site-packages/asyncpg/connection.py:295: in execute
    return await self._protocol.query(query, timeout)
asyncpg/protocol/protocol.pyx:301: in query
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

asyncpg/protocol/protocol.pyx:664: InterfaceError
Samery00 commented 2 years ago

Facing the same issue, did the issue has been solved? or any work around could be done?

MarkParker5 commented 2 years ago

I fixed it by setting poolclass=NullPool in create_async_engine