pytest-dev / pytest-asyncio

Asyncio support for pytest
https://pytest-asyncio.readthedocs.io
Apache License 2.0
1.43k stars 150 forks source link

RuntimeError: Task attached to a different loop #922

Closed mhassan5809 closed 2 months ago

mhassan5809 commented 2 months ago

main class

class Database:
    def __init__(self, db_url: str) -> None:
        self.db_url = db_url
        self._engine: AsyncEngine | None = create_async_engine(url=db_url, echo=True)
        self._sessionmaker: async_sessionmaker[AsyncSession] | None = (
            async_sessionmaker(
                autocommit=False,
                autoflush=False,
                bind=self._engine,
            )
        )

    async def close(self) -> None:
        if self._engine is None:
            raise DatabaseError(message="DatabaseSessionManager is not initialized")
        await self._engine.dispose()

        self._engine = None
        self._sessionmaker = None

    @contextlib.asynccontextmanager
    async def session(self) -> AsyncIterator[AsyncSession]:
        if self._sessionmaker is None:
            raise DatabaseError(message="DatabaseSessionManager is not initialized")

        session = self._sessionmaker()
        try:
            yield session
        except Exception as err:
            await session.rollback()
            raise DatabaseError(
                message=f"An error occurred during the session {err}"
            ) from err
        finally:
            await session.close()

conftest.py

from collections.abc import AsyncGenerator, Generator

import pytest_asyncio
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
from src.core.config import settings
from src.core.db import Base, Database
from src.main import fastapi_app

class MockDatabase(Database):
    async def drop_database(self) -> None:
        if self._engine:
            async with self._engine.begin() as conn:
                await conn.run_sync(Base.metadata.drop_all)

@pytest_asyncio.fixture(scope="session")
async def mock_database() -> AsyncGenerator[MockDatabase, None]:
    db = MockDatabase(db_url=settings.TEST_DATABASE_URL)
    try:
        yield db
    finally:
        await db.drop_database()
        await db.close()

@pytest_asyncio.fixture(scope="session")
def my_app(
    mock_database: MockDatabase,  # pylint: disable=redefined-outer-name
) -> Generator[FastAPI, None, None]:
    app = fastapi_app.create_app()
    app.container.db.override(mock_database)  # type: ignore

    yield app

    app.container.reset_override()  # type: ignore

@pytest_asyncio.fixture(scope="function")
async def test_client(
    my_app: FastAPI,  # pylint: disable=redefined-outer-name
) -> AsyncGenerator[AsyncClient, None]:
    async with AsyncClient(
        transport=ASGITransport(app=my_app),
        base_url="http://test",
        timeout=0,
    ) as client:

        yield client

test_api.py

from unittest.mock import patch

import pytest
from httpx import AsyncClient

from src.core.cache.cache_manager import Cache
from src.modules.auth.schemas import DeviceInfo, UserLogin

@pytest.mark.asyncio
class TestExample:
    async def test_example(self, test_client: AsyncClient) -> None:
        login_data = UserLogin(
            phone="01630811624",
            password="123456",
            last_logged_metadata=DeviceInfo(
                fcm_token="test",
                device_id="test",
                latitude="test",
                longitude="test",
                ip_address="1.1.1.1",
            ),
        )
        with patch.object(Cache, "set"):
            response = await test_client.post(
                "/api/v2/auth/login/", json=login_data.model_dump()
            )
        result = response.json()
        assert response.status_code == 200
        assert "access_token" in result["data"]
        assert "refresh_token" in result["data"]

Errors

 ============================================================================= ERRORS =============================================================================
_________________________________________________________ ERROR at teardown of TestExample.test_example __________________________________________________________

    def finalizer() -> None:
        """Yield again, to finalize."""

        async def async_finalizer() -> None:
            try:
                await gen_obj.__anext__()  # type: ignore[union-attr]
            except StopAsyncIteration:
                pass
            else:
                msg = "Async generator fixture didn't stop."
                msg += "Yield only once."
                raise ValueError(msg)

>       event_loop.run_until_complete(async_finalizer())

.venv/lib/python3.10/site-packages/pytest_asyncio/plugin.py:341: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
    return future.result()
.venv/lib/python3.10/site-packages/pytest_asyncio/plugin.py:333: in async_finalizer
    await gen_obj.__anext__()  # type: ignore[union-attr]
src/tests/conftest.py:42: in mock_database
    await db.drop_database()
src/tests/conftest.py:18: in drop_database
    await conn.run_sync(Base.metadata.drop_all)
.venv/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/engine.py:886: in run_sync
    return await greenlet_spawn(
.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:201: in greenlet_spawn
    result = context.throw(*sys.exc_info())
.venv/lib/python3.10/site-packages/sqlalchemy/sql/schema.py:5894: in drop_all
    bind._run_ddl_visitor(
.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2459: in _run_ddl_visitor
    visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
.venv/lib/python3.10/site-packages/sqlalchemy/sql/visitors.py:664: in traverse_single
    return meth(obj, **kw)
.venv/lib/python3.10/site-packages/sqlalchemy/sql/ddl.py:1018: in visit_metadata
    unsorted_tables = [t for t in tables if self._can_drop_table(t)]
.venv/lib/python3.10/site-packages/sqlalchemy/sql/ddl.py:1018: in <listcomp>
    unsorted_tables = [t for t in tables if self._can_drop_table(t)]
.venv/lib/python3.10/site-packages/sqlalchemy/sql/ddl.py:1094: in _can_drop_table
    return not self.checkfirst or self.dialect.has_table(
.venv/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py:88: in cache
    return fn(self, con, *args, **kw)
.venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/base.py:3347: in has_table
    return bool(connection.scalar(query, {"table_name": table_name}))
.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1307: in scalar
    return meth(
.venv/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:533: in _execute_on_scalar
    return self._execute_on_connection(
.venv/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:515: in _execute_on_connection
    return connection._execute_clauseelement(
.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1640: in _execute_clauseelement
    ret = self._execute_context(
.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1846: in _execute_context
    return self._exec_single_context(
.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1986: in _exec_single_context
    self._handle_dbapi_exception(
.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2358: in _handle_dbapi_exception
    raise exc_info[1].with_traceback(exc_info[2])
.venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967: in _exec_single_context
    self.dialect.do_execute(
.venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941: in do_execute
    cursor.execute(statement, parameters)
.venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:572: in execute
    self._adapt_connection.await_(
.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:132: in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
.venv/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py:196: in greenlet_spawn
    value = await result
.venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:508: in _prepare_and_execute
    await adapt_connection._start_transaction()
.venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:837: in _start_transaction
    self._handle_exception(error)
.venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:786: in _handle_exception
    raise error
.venv/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:835: in _start_transaction
    await self._transaction.start()
.venv/lib/python3.10/site-packages/asyncpg/transaction.py:146: in start
    await self._connection.execute(query)
.venv/lib/python3.10/site-packages/asyncpg/connection.py:350: in execute
    result = await self._protocol.query(query, timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   RuntimeError: Task <Task pending name='Task-18' coro=<_wrap_asyncgen_fixture.<locals>._asyncgen_fixture_wrapper.<locals>.finalizer.<locals>.async_finalizer() running at /home/mahmudul/Desktop/projects/GK/neo-app-backend/.venv/lib/python3.10/site-packages/pytest_asyncio/plugin.py:333> cb=[_run_until_complete_cb() at /usr/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop

asyncpg/protocol/protocol.pyx:374: RuntimeError
----------------------------------------------------------------------- Captured log call ------------------------------------------------------------------------
INFO     sqlalchemy.engine.Engine:base.py:1899 select pg_catalog.version()
INFO     sqlalchemy.engine.Engine:base.py:1904 [raw sql] ()
INFO     sqlalchemy.engine.Engine:base.py:1899 select current_schema()
INFO     sqlalchemy.engine.Engine:base.py:1904 [raw sql] ()
INFO     sqlalchemy.engine.Engine:base.py:1899 show standard_conforming_strings
INFO     sqlalchemy.engine.Engine:base.py:1904 [raw sql] ()
INFO     sqlalchemy.engine.Engine:base.py:1099 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:base.py:1899 SELECT customers.name, customers.phone, customers.gender, customers.password, customers.avatar_url, customers.nid_no, customers.nid_image_urls, customers.qr_code_url, customers.dob, customers.status, customers.refer_code, customers.referer_code, customers.referer_type, customers.email, customers.profession, customers.marital_status, customers.last_logged_in_at, customers.last_logged_metadata, customers.password_last_changed_at, customers.id, customers.created_at, customers.updated_at, customer_gold_balances_1.customer_id, customer_gold_balances_1.current_balance, customer_gold_balances_1.last_ledger_entry_id, customer_gold_balances_1.summary, customer_gold_balances_1.id AS id_1, customer_gold_balances_1.created_at AS created_at_1, customer_gold_balances_1.updated_at AS updated_at_1 
FROM customers LEFT OUTER JOIN customer_gold_balances AS customer_gold_balances_1 ON customers.id = customer_gold_balances_1.customer_id 
WHERE customers.phone = $1::VARCHAR
INFO     sqlalchemy.engine.Engine:base.py:1904 [generated in 0.00024s] ('01630811624',)
INFO     sqlalchemy.engine.Engine:base.py:1125 ROLLBACK
--------------------------------------------------------------------- Captured log teardown ----------------------------------------------------------------------
INFO     sqlalchemy.engine.Engine:base.py:1099 BEGIN (implicit)
INFO     sqlalchemy.engine.Engine:base.py:1899 SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = $1::VARCHAR AND pg_catalog.pg_class.relkind = ANY (ARRAY[$2::VARCHAR, $3::VARCHAR, $4::VARCHAR, $5::VARCHAR, $6::VARCHAR]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != $7::VARCHAR
INFO     sqlalchemy.engine.Engine:base.py:1904 [generated in 0.00046s] ('customers', 'r', 'p', 'f', 'v', 'm', 'pg_catalog')
INFO     sqlalchemy.engine.Engine:base.py:1125 ROLLBACK
==================================================================== short test summary info =====================================================================
ERROR src/tests/integration/auth/test_api.py::TestExample::test_example - RuntimeError: Task <Task pending name='Task-18' coro=<_wrap_asyncgen_fixture.<locals>._asyncgen_fixture_wrapper.<locals>.finalizer.<locals>.async_finalizer()...

if remove await db.drop_database() then it works fine .

@pytest.fixture(scope="session")
async def mock_database() -> AsyncGenerator[MockDatabase, None]:
    db = MockDatabase(db_url=settings.TEST_DATABASE_URL)
    try:
        yield db
    finally:
        await db.drop_database()
        await db.close()

could not solve the problem . any idea why i got this error or how to solve it . Help Please. Thanks pytest-asyncio = "^0.24.0"

seifertm commented 2 months ago

@mhassan5809 The example you provided is neither self-contained, nor minimal. That means I cannot run and debug it.

Looking at the code, it could be related to the scopes of the async fixtures. I see you're using pytest-asyncio v0.24. Are you aware of the newly introduced async_default_fixture_loop_scope config option? If so, what value did you set it to?

MahmudulHassan5809 commented 2 months ago

Thanks for the reply @seifertm. async_default_fixture_loop_scope value is session.

I will create a minimal app and paste it here tomorrow so that you can run it. Thanks.

seifertm commented 2 months ago

@MahmudulHassan5809 Thanks for the quick turnaround.

If _async_default_fixture_loopscope is set to session, that means the fixtures run in a different asyncio event loop than TestExample.test_example. Changing the @pytest.mark.asyncio decorator to @pytest.mark.asyncio(loop_scope="session") should resolve the issue. Alternatively, you can run all tests in the session loop

mhassan5809 commented 2 months ago

@seifertm thanks a lot. seems the problem is solved . I appreciate your guidance. Thank You very much.