pytest-dev / pytest-asyncio

Asyncio support for pytest
https://pytest-asyncio.readthedocs.io
Apache License 2.0
1.43k stars 152 forks source link

Async Tests Failing with sqlalchemy.exc.InterfaceError: users Table Not Recognized Despite Schema Creation #903

Closed timesbyusman closed 3 months ago

timesbyusman commented 3 months ago

I am new to pytest and developed async apis along with async database operations. I wrote tests for my crud operations and Yesterday I got error related to 706

fortunately I got the pre-release version (pytest-asyncio v0.24.0a0) and it worked for me. Now I am working on writing tests for async apis. The Problem I am facing is when I pass the db session to async api to be used it gives error

sqlalchemy.exc.ProgrammingError: (psycopg.errors.UndefinedTable) relation "users" does not exist LINE 2: FROM users

I debugged the test before sending request using async client to verify if the tables exist in the database and they did.

I have checked all over the internet but didn't find any solution

my conftest.py looks like this.

TEST_DATABASE_URL = f"postgresql+asyncpg://{USER}:{PWD}@{HOST}:{PORT}/test_db"`
engine = create_async_engine(TEST_DATABASE_URL, future=True)

TestingSessionLocal = async_sessionmaker(
    bind=engine,
    expire_on_commit=False,
    class_=AsyncSession
)

@pytest.fixture(scope="session", autouse=True)
async def setup_db():
    """
    Setup and teardown the database once per session.
    """
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.create_all)
    async with TestingSessionLocal() as session:
        await init_db(session)
    yield
    async with engine.begin() as conn:
        await conn.run_sync(SQLModel.metadata.drop_all)

@pytest.fixture(scope="function")
async def get_db(setup_db):
    """
    Create a new async database session for each test function.
    """
    async with TestingSessionLocal() as session:
        yield session

@pytest.fixture(scope="function")
async def override_get_db(get_db):
    """
    Override the get_db fixture to return the async database session.
    """
    app.dependency_overrides[get_db] = lambda: get_db
    yield get_db
    app.dependency_overrides[get_db] = {}

NOTE: await init_db(session) is being used to initialize data in the empty test database.

test_api.py

EMAIL = settings.FIRST_SUPERUSER
PWD = settings.FIRST_SUPERUSER_PASSWORD

@pytest.mark.asyncio
async def test_login_access_token(override_get_db):
    async with AsyncClient(app=app, base_url=f"http://testserver{settings.API_V1_STR}") as ac:
        response = await ac.post("/auth/login", data={"username": EMAIL, "password": PWD})
        assert response.status_code == 200
        tokens = response.json()
        assert "access_token" in tokens
        assert "refresh_token" in tokens

Complete Error log

==================================================================== test session starts =====================================================================
platform darwin -- Python 3.10.14, pytest-8.3.2, pluggy-1.5.0 -- /Users/usman/opt/anaconda3/envs/pms/bin/python
cachedir: .pytest_cache
rootdir: /Users/usman/pms
configfile: pyproject.toml
plugins: time-machine-2.14.2, cov-5.0.0, asyncio-0.24.0a0, anyio-4.4.0, mock-3.14.0
asyncio: mode=auto, default_loop_scope=session
collected 1 item                                                                                                                                             

auth/tests/test_api.py::test_login_access_token FAILED                                                                                                 [100%]

========================================================================== FAILURES ==========================================================================
__________________________________________________________________ test_login_access_token ___________________________________________________________________

self = <sqlalchemy.engine.base.Connection object at 0x7fdf18f3d330>
dialect = <sqlalchemy.dialects.postgresql.psycopg.PGDialectAsync_psycopg object at 0x7fdf17783a60>
context = <sqlalchemy.dialects.postgresql.psycopg.PGExecutionContext_psycopg object at 0x7fdf18fb4220>
statement = <sqlalchemy.dialects.postgresql.psycopg.PGCompiler_psycopg object at 0x7fdf18fb4610>, parameters = [{'email_1': 'test@email.com'}]

    def _exec_single_context(
        self,
        dialect: Dialect,
        context: ExecutionContext,
        statement: Union[str, Compiled],
        parameters: Optional[_AnyMultiExecuteParams],
    ) -> CursorResult[Any]:
        """continue the _execute_context() method for a single DBAPI
        cursor.execute() or cursor.executemany() call.

        """
        if dialect.bind_typing is BindTyping.SETINPUTSIZES:
            generic_setinputsizes = context._prepare_set_input_sizes()

            if generic_setinputsizes:
                try:
                    dialect.do_set_input_sizes(
                        context.cursor, generic_setinputsizes, context
                    )
                except BaseException as e:
                    self._handle_dbapi_exception(
                        e, str(statement), parameters, None, context
                    )

        cursor, str_statement, parameters = (
            context.cursor,
            context.statement,
            context.parameters,
        )

        effective_parameters: Optional[_AnyExecuteParams]

        if not context.executemany:
            effective_parameters = parameters[0]
        else:
            effective_parameters = parameters

        if self._has_events or self.engine._has_events:
            for fn in self.dispatch.before_cursor_execute:
                str_statement, effective_parameters = fn(
                    self,
                    cursor,
                    str_statement,
                    effective_parameters,
                    context,
                    context.executemany,
                )

        if self._echo:
            self._log_info(str_statement)

            stats = context._get_cache_stats()

            if not self.engine.hide_parameters:
                self._log_info(
                    "[%s] %r",
                    stats,
                    sql_util._repr_params(
                        effective_parameters,
                        batches=10,
                        ismulti=context.executemany,
                    ),
                )
            else:
                self._log_info(
                    "[%s] [SQL parameters hidden due to hide_parameters=True]",
                    stats,
                )

        evt_handled: bool = False
        try:
            if context.execute_style is ExecuteStyle.EXECUTEMANY:
                effective_parameters = cast(
                    "_CoreMultiExecuteParams", effective_parameters
                )
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_executemany:
                        if fn(
                            cursor,
                            str_statement,
                            effective_parameters,
                            context,
                        ):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_executemany(
                        cursor,
                        str_statement,
                        effective_parameters,
                        context,
                    )
            elif not effective_parameters and context.no_parameters:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute_no_params:
                        if fn(cursor, str_statement, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_execute_no_params(
                        cursor, str_statement, context
                    )
            else:
                effective_parameters = cast(
                    "_CoreSingleExecuteParams", effective_parameters
                )
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute:
                        if fn(
                            cursor,
                            str_statement,
                            effective_parameters,
                            context,
                        ):
                            evt_handled = True
                            break
                if not evt_handled:
>                   self.dialect.do_execute(
                        cursor, str_statement, effective_parameters, context
                    )

../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/default.py:924: in do_execute
    cursor.execute(statement, parameters)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/psycopg.py:587: in execute
    result = self.await_(self._cursor.execute(query, params, **kw))
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:132: in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:196: in greenlet_spawn
    value = await result
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg.AsyncCursor [closed] [IDLE] (host=localhost user=postgres database=pms) at 0x7fdf18fa2b90>
query = 'SELECT users.email, users.is_active, users.is_superuser, users.id, users.created_at, users.updated_at, users.is_deleted, users.hashed_password, users.tenant_id \nFROM users \nWHERE users.email = %(email_1)s::VARCHAR'
params = {'email_1': 'test@email.com'}

    async def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            async with self._conn.lock:
                await self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           psycopg.errors.UndefinedTable: relation "users" does not exist
E           LINE 2: FROM users 
E                        ^

../opt/anaconda3/envs/pms/lib/python3.10/site-packages/psycopg/cursor_async.py:97: UndefinedTable

The above exception was the direct cause of the following exception:

override_get_db = <sqlalchemy.ext.asyncio.session.AsyncSession object at 0x7fdf18f1f3a0>

    @pytest.mark.asyncio(scope="function")
    async def test_login_access_token(override_get_db):
        async with AsyncClient(app=app, base_url=f"http://testserver{settings.API_V1_STR}") as ac:
>           response = await ac.post("/auth/login", data={"username": EMAIL, "password": PWD})

auth/tests/test_api.py:27: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/httpx/_client.py:1892: in post
    return await self.request(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/httpx/_client.py:1574: in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/httpx/_client.py:1661: in send
    response = await self._send_handling_auth(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/httpx/_client.py:1689: in _send_handling_auth
    response = await self._send_handling_redirects(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/httpx/_client.py:1726: in _send_handling_redirects
    response = await self._send_single_request(request)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/httpx/_client.py:1763: in _send_single_request
    response = await transport.handle_async_request(request)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/httpx/_transports/asgi.py:164: in handle_async_request
    await self.app(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/fastapi/applications.py:1054: in __call__
    await super().__call__(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/applications.py:123: in __call__
    await self.middleware_stack(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/middleware/errors.py:186: in __call__
    raise exc
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/middleware/errors.py:164: in __call__
    await self.app(scope, receive, _send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/middleware/cors.py:85: in __call__
    await self.app(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/middleware/exceptions.py:65: in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/_exception_handler.py:64: in wrapped_app
    raise exc
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/_exception_handler.py:53: in wrapped_app
    await app(scope, receive, sender)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/routing.py:756: in __call__
    await self.middleware_stack(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/routing.py:776: in app
    await route.handle(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/routing.py:297: in handle
    await self.app(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/routing.py:77: in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/_exception_handler.py:64: in wrapped_app
    raise exc
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/_exception_handler.py:53: in wrapped_app
    await app(scope, receive, sender)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/starlette/routing.py:72: in app
    response = await func(request)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/fastapi/routing.py:278: in app
    raw_response = await run_endpoint_function(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/fastapi/routing.py:191: in run_endpoint_function
    return await dependant.call(**values)
auth/app/api/api.py:51: in login_access_token
    user = await authenticate(
auth/app/services/crud.py:45: in authenticate
    user = await get_user_by_email(session=session, email=email)
auth/app/services/crud.py:37: in get_user_by_email
    user = await session.execute(statement)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py:461: in execute
    result = await greenlet_spawn(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:201: in greenlet_spawn
    result = context.throw(*sys.exc_info())
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/orm/session.py:2351: in execute
    return self._execute_internal(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/orm/session.py:2236: in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/orm/context.py:293: in orm_execute_statement
    result = conn.execute(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1418: in execute
    return meth(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:515: in _execute_on_connection
    return connection._execute_clauseelement(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1640: in _execute_clauseelement
    ret = self._execute_context(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1846: in _execute_context
    return self._exec_single_context(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1986: in _exec_single_context
    self._handle_dbapi_exception(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2353: in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967: in _exec_single_context
    self.dialect.do_execute(
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/engine/default.py:924: in do_execute
    cursor.execute(statement, parameters)
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/psycopg.py:587: in execute
    result = self.await_(self._cursor.execute(query, params, **kw))
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:132: in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:196: in greenlet_spawn
    value = await result
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <psycopg.AsyncCursor [closed] [IDLE] (host=localhost user=postgres database=pms) at 0x7fdf18fa2b90>
query = 'SELECT users.email, users.is_active, users.is_superuser, users.id, users.created_at, users.updated_at, users.is_deleted, users.hashed_password, users.tenant_id \nFROM users \nWHERE users.email = %(email_1)s::VARCHAR'
params = {'email_1': 'test@email.com'}

    async def execute(
        self,
        query: Query,
        params: Params | None = None,
        *,
        prepare: bool | None = None,
        binary: bool | None = None,
    ) -> Self:
        """
        Execute a query or command to the database.
        """
        try:
            async with self._conn.lock:
                await self._conn.wait(
                    self._execute_gen(query, params, prepare=prepare, binary=binary)
                )
        except e._NO_TRACEBACK as ex:
>           raise ex.with_traceback(None)
E           sqlalchemy.exc.ProgrammingError: (psycopg.errors.UndefinedTable) relation "users" does not exist
E           LINE 2: FROM users 
E                        ^
E           [SQL: SELECT users.email, users.is_active, users.is_superuser, users.id, users.created_at, users.updated_at, users.is_deleted, users.hashed_password, users.tenant_id 
E           FROM users 
E           WHERE users.email = %(email_1)s::VARCHAR]
E           [parameters: {'email_1': 'test@email.com'}]
E           (Background on this error at: https://sqlalche.me/e/20/f405)

../opt/anaconda3/envs/pms/lib/python3.10/site-packages/psycopg/cursor_async.py:97: ProgrammingError
--------------------------------------------------------------------- Captured log setup ---------------------------------------------------------------------
WARNING  passlib.handlers.bcrypt:bcrypt.py:622 (trapped) error reading bcrypt version
Traceback (most recent call last):
  File "/Users/usman/opt/anaconda3/envs/pms/lib/python3.10/site-packages/passlib/handlers/bcrypt.py", line 620, in _load_backend_mixin
    version = _bcrypt.__about__.__version__
AttributeError: module 'bcrypt' has no attribute '__about__'
====================================================================== warnings summary ======================================================================
../opt/anaconda3/envs/pms/lib/python3.10/site-packages/pytest_asyncio/plugin.py:1014
auth/tests/test_api.py::test_login_access_token
  /Users/usman/opt/anaconda3/envs/pms/lib/python3.10/site-packages/pytest_asyncio/plugin.py:1014: PytestDeprecationWarning: The "scope" keyword argument to the asyncio marker has been deprecated. Please use the "loop_scope" argument instead.

    warnings.warn(PytestDeprecationWarning(_MARKER_SCOPE_KWARG_DEPRECATION_WARNING))

auth/tests/test_api.py::test_login_access_token
  /Users/usman/opt/anaconda3/envs/pms/lib/python3.10/site-packages/httpx/_client.py:1426: DeprecationWarning: The 'app' shortcut is now deprecated. Use the explicit style 'transport=ASGITransport(app=...)' instead.
    warnings.warn(message, DeprecationWarning)

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
================================================================== short test summary info ===================================================================
FAILED auth/tests/test_api.py::test_login_access_token - sqlalchemy.exc.ProgrammingError: (psycopg.errors.UndefinedTable) relation "users" does not exist
=============================================================== 1 failed, 3 warnings in 2.06s ================================================================

pyproject.toml

[tool.pytest.ini_options]
addopts = "-v"
testpaths = ["tests"]
pythonpath = "."
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "session"
seifertm commented 3 months ago

Since you're setting asyncio_default_fixture_scope = "session", your _setupdb fixture runs in a session-scoped event loop, whereas your test runs in a function-scoped loop (the default).

Can you try decorating test_login_access_token with @pytest.mark.asyncio(loop_scope="session")? This should make the test run in the session-wide loop. (see also How to run all tests in the session in the same event loop)

timesbyusman commented 3 months ago

@seifertm I already tried it but it didn't worked.

seifertm commented 3 months ago

I don't see anything obviously wrong with your code. You could try to add the --setup-show option to pytest and check if the event loop fixtures are set up in the correct order.

If that doesn't help, I'd appreciate if you could reduce the code even further and make it self-contained, so that I can debug it locally.

timesbyusman commented 3 months ago

@seifertm I tried --setup-show for both tests, api and crud. But I can't see any difference in the event loop they both are using

test from crud

Screenshot 2024-08-01 at 9 52 03 PM

test from api

Screenshot 2024-08-01 at 9 51 45 PM

here's a contained version of the api for which I am trying to write tests

from typing import Optional
from fastapi import FastAPI, APIRouter, Depends, Form, HTTPException
from fastapi.security import OAuth2PasswordRequestForm

app = FastAPI()
router = APIRouter(prefix="/auth", tags=["Auth"])

# Mock data and functions
users_db = {
    "user@example.com": {
        "email": "user@example.com",
        "password": "hashedpassword",
        "is_active": True,
        "roles": ["user"],
        "tenant_id": 1
    }
}

def authenticate(email: str, password: str):
    user = users_db.get(email)
    return user if user and user["password"] == password else None

def get_tokens(user_id: int, roles: list, tenant_id: int):
    return "access_token", "refresh_token", 3600

def save_refresh_token(token: str, user_id: int, expiry: int):
    pass  # Just a placeholder

def validate_redirect_url(url: str):
    pass  # Just a placeholder

class Tokens(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str
    redirect_url: Optional[str] = None

@router.post("/login")
async def login_access_token(
    form_data: Depends(OAuth2PasswordRequestForm),
    redirect_url: Optional[str] = Form(None)
) -> Tokens:
    user = authenticate(email=form_data.username, password=form_data.password)
    if not user:
        raise HTTPException(status_code=400, detail="Incorrect email or password")
    if not user["is_active"]:
        raise HTTPException(status_code=400, detail="Inactive user")

    roles = user["roles"]
    access_token, refresh_token, expiry = get_tokens(user_id=1, roles=roles, tenant_id=user["tenant_id"])
    save_refresh_token(token=refresh_token, user_id=1, expiry=expiry)

    if redirect_url:
        validate_redirect_url(redirect_url)

    return Tokens(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer",
        redirect_url=redirect_url
    )

app.include_router(router)
seifertm commented 3 months ago

I want to help get to the bottom of this. I really do. But I cannot get started looking into the underlying issue with the present information. For example, conftest.py references some database credentials like USER and PWD. _testapi.py uses some unknown settings module, and so forth.

Even if I managed to figure out all the details to get to a running pytest call, it's very likely that I end up with a different configuration and won't be able to reproduce your problem.

In order to help, I need a Minimal, Reproducible Example that I can simply copy and paste to analyze the error you're seeing.

seifertm commented 3 months ago

I cannot help without a reproducer. Therefore, I'm closing this issue for the time being.