faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

v0.10.4, event loop is closed in agent test context #443

Closed bradydean closed 1 year ago

bradydean commented 1 year ago

Checklist

Steps to reproduce

Upgrade to 0.10.4

Expected behavior

Event loop is not closed

Actual behavior

Bumping to 0.10.4 breaks tests cases in our CI pipeline. This traceback is from after entering an agent test context.

Full traceback

tests/test_async.py:74: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.tox/py311/lib/python3.11/site-packages/mode/services.py:112: in __aenter__
    await self.start()
.tox/py311/lib/python3.11/site-packages/mode/services.py:800: in start
    await self._default_start()
.tox/py311/lib/python3.11/site-packages/mode/services.py:807: in _default_start
    await self._actually_start()
.tox/py311/lib/python3.11/site-packages/mode/services.py:824: in _actually_start
    await self.on_start()
.tox/py311/lib/python3.11/site-packages/faust/agents/agent.py:283: in on_start
    await self._on_start_supervisor()
.tox/py311/lib/python3.11/site-packages/faust/agents/agent.py:314: in _on_start_supervisor
    res = await self._start_one(
.tox/py311/lib/python3.11/site-packages/faust/agents/agent.py:252: in _start_one
    return await self._start_task(
.tox/py311/lib/python3.11/site-packages/faust/agents/agent.py:650: in _start_task
    return await self._prepare_actor(
.tox/py311/lib/python3.11/site-packages/faust/agents/agent.py:664: in _prepare_actor
    task = asyncio.Task(self._execute_actor(coro, aref), loop=self.loop)
/opt/hostedtoolcache/Python/3.11.1/x64/lib/python3.11/asyncio/base_events.py:761: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

Versions

wbarnha commented 1 year ago

I suspect #441 is responsible for this. I'll take a look. I may yank v0.10.4.

wbarnha commented 1 year ago

@bradydean is this isolated to Python 3.11 or all versions of Python for yo6r tests?

bradydean commented 1 year ago

It failed on 3.10 and 3.11, those are the only 2 versions I run on.

wbarnha commented 1 year ago

https://github.com/faust-streaming/faust/blob/032294b2244c4167bfcc2acf6c9ef8fded199722/faust/agents/agent.py#L229

This part is responsible for your woes. I'm surprised it didn't break our test cases but it broke yours. Could you provide any sample code?

bradydean commented 1 year ago

Here's a minimal example that fails for me

topic = app.topic(
    "topic", value_serializer="json"
)

@app.agent(topic)
async def testme(stream):
    async for event in stream:
        yield

@pytest.mark.asyncio
async def test_agent():
    async with testme.test_context() as agent:
        await agent.put({"foo": "bar"})
wbarnha commented 1 year ago

https://faust-streaming.github.io/faust/userguide/testing.html#testing-that-an-agent-sends-to-topic-calls-another-agent may be useful to you. I'll still try to look into this though.

Roman1us commented 1 year ago

Do you use test_app fixture, as provided in documentation? We are using something like this:

@pytest.fixture
def test_app(event_loop):
    from app import app

    app.loop = event_loop  # <-- Here set event loop for app
    app.finalize()
    app.conf.store = "memory://"
    app.flow_control.resume()

    try:
        yield app
    finally:
        assert app.tracer is None
bradydean commented 1 year ago

Add that and tests passed, thanks!

thomas-chauvet commented 1 year ago

Thanks for the tips @Roman1us !

If you use a fixture in a conftest.py file do not forget to add scope=function:

import pytest

@pytest.fixture(scope="function")
def test_stateless_app(event_loop):
    from app import app

    app.loop = event_loop

    app.finalize()
    app.conf.table_cleanup_interval = 1.0
    app.flow_control.resume()

    return app

Should we update the doc with the line app.loop = event_loop and also specify that the app's import must be in the fixture?

wbarnha commented 1 year ago

Closed in #467.