faust-streaming / mode

Python AsyncIO Services
https://faust-streaming.github.io/mode/
Other
43 stars 16 forks source link

Stop threads properly using _wakeup_timer_in_thread #39

Closed wbarnha closed 1 year ago

wbarnha commented 1 year ago

Hopefully this will eventually fix #38.

wbarnha commented 1 year ago

9 times out of 10, this example prints hello properly:

import asyncio

from mode import Service
from mode.threads import ServiceThread

class T(ServiceThread):
    @Service.task
    async def poll_thread(self):
        while not self.should_stop:
            await asyncio.sleep(0)
            print("hello")

async def run():
    t = T()
    await t.start()
    await t.stop()

if __name__ == '__main__':
    for _ in range(3):
        asyncio.run(run())

But what about the 1 out of 10 times it doesn't?

wbarnha commented 1 year ago
hello
hello
hello
hello
hello
hello
hello
hello
hello

Now we're guaranteed to see hello!

wbarnha commented 1 year ago

Let's test this with Faust before merging.

lqhuang commented 1 year ago

Could you revert other changes and try my commented solution?

That is enough to solve pending task warning.

I also test it for faust, it works normally.

2046 passed, 62 skipped, 31 warnings in 108.22s (0:01:48)
lqhuang commented 1 year ago

I added an test case for this function, you could try it:

    @pytest.mark.asyncio
    async def test__wakeup_timer_in_thread(self, *, thread, event_loop):
        thread.add_future = Mock(name="thread.add_future")
        thread._wakeup_timer_in_thread = AsyncMock()
        thread._stopped.is_set = Mock(return_value=False)
        thread._crashed.is_set = Mock(return_value=False)
        thread.sleep = AsyncMock()

        def cb():
            thread._stopped.is_set.return_value = True
            assert thread.should_stop

        event_loop.call_soon(cb)
        await thread._keepalive2()

        thread._wakeup_timer_in_thread.assert_awaited()

Add it to test_threads.py

And fix the _keepalive2() function to

    async def _keepalive2(self) -> None:
        while not self.should_stop:
            await self.sleep(2.0)
            if self.last_wakeup_at:
                if monotonic() - self.last_wakeup_at > 3.0:
                    self.log.error("Thread keepalive is not responding...")
            await asyncio.sleep(0.0)  # for unittest to invoke `call_soon`
            await self._wakeup_timer_in_thread()
wbarnha commented 1 year ago

Very interesting, thank you for providing the code above! I'll merge in your changes, but I'd like to retain the changes I included elsewhere since they're general QoL fixes since the internals of Event in mode.utils.locks are slightly difference from Event in asyncio.

wbarnha commented 1 year ago

I am open to the idea of eliminating our usage of mode.utils.locks at some point but I think that's for a separate discussion. I know that we created a little chaos during our task of supporting 3.11, but I'd like to continue to having Mode being compatible with Faust as much as possible.

lqhuang commented 1 year ago

We also can add some integration tests for faust to GitHub Actions while releasing new version of mode

wbarnha commented 1 year ago

Nobody seems to have raised any issues with it yet, but in 0.3.4, I've been seeing Timer Monitor.sample woke up late appear more frequently than usual, with really high drift times.