pytest-dev / pytest-asyncio

Asyncio support for pytest
https://pytest-asyncio.readthedocs.io
Apache License 2.0
1.43k stars 152 forks source link

Ability to wait for background tasks to complete before moving to next test? #952

Closed mzealey closed 1 month ago

mzealey commented 1 month ago

Previously, we have been running the following code to allow various backgrounded asyncio tasks to complete before shutting down the event loop:

@pytest.fixture(scope="session") 
def event_loop(request: Request) -> Generator[asyncio.AbstractEventLoop, None, None]: 
    """Create an instance of the default event loop for each test case.""" 

    def custom_exception_handler(loop: asyncio.AbstractEventLoop, context: dict) -> NoReturn: 
        loop.default_exception_handler(context) 

        exception = context.get("exception") 
        loop.stop() 
        raise exception 

    loop = asyncio.get_event_loop_policy().new_event_loop() 
    loop.set_exception_handler(custom_exception_handler) 
    yield loop 
    # Wait until all background tasks have completed 
    loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop))) 
    loop.close() 

It would be nice to move this into the asyncio_default_fixture_loop_scope format, but it seems that the default event loop runner does not wait for background tasks to complete before the loop gets terminated.