faust-streaming / mode

Python AsyncIO Services
https://faust-streaming.github.io/mode/
Other
43 stars 16 forks source link

ServiceThread complains Task was destroyed but it is pending! #38

Closed kevin-ip closed 1 year ago

kevin-ip commented 1 year ago

Checklist

Steps to reproduce

I have a scheduled job that start and stop a ServiceThread but it complains "Task was destroyed but it is pending!" The following script simulates the scheduled job.

import asyncio

from mode import Service
from mode.threads import ServiceThread

class T(ServiceThread):
    @Service.task
    async def poll_thread(self):
        while not self.should_stop:
            await asyncio.sleep(0)
            print("hello")

async def run():
    t = T()
    await t.start()
    await t.stop()

if __name__ == '__main__':
    for _ in range(3):
        asyncio.run(run())

Expected behavior

The script should only print "hello"

Actual behavior

It complains "Task was destroyed but it is pending!"

Full traceback

hello
hello
Task was destroyed but it is pending!
task: <Task pending name='Task-11' coro=<ServiceThread._wakeup_timer_in_thread() done, defined at /Users/kevinip/code/github/faust-streaming/mode/mode/threads.py:190> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/futures.py:394]>
hello
hello
hello
hello
Task was destroyed but it is pending!
task: <Task pending name='Task-27' coro=<ServiceThread._wakeup_timer_in_thread() done, defined at /Users/kevinip/code/github/faust-streaming/mode/mode/threads.py:190> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/futures.py:394]>
Task was destroyed but it is pending!
task: <Task pending name='Task-41' coro=<ServiceThread._wakeup_timer_in_thread() done, defined at /Users/kevinip/code/github/faust-streaming/mode/mode/threads.py:190> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[_chain_future.<locals>._call_set_state() at /Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/futures.py:394]>

Versions

jalaziz commented 1 year ago

I believe the issue here is that the thread loop is not properly shutdown. If you look at asyncio.run there are several steps to properly closing a loop. Unfortunately, the shutdown steps are not available in a convenient standalone method. It's a bit more re-usable in 3.11 though.

Another option would be to ensure _wakeup_timer_in_thread is awaited on shutdown to give it a chance to get scheduled.

wbarnha commented 1 year ago

Give #39 a look, should fix the issue you see.