taskiq-python / taskiq

Distributed task queue with full async support
MIT License
874 stars 53 forks source link

got Future <Future pending> attached to a different loop #282

Open noctuid opened 10 months ago

noctuid commented 10 months ago

I'm seeing the issue mentioned in #229 still in 0.10.4. Tested with taskiq 0.10.4, taskiq-redis 0.5.5, and python 3.8.15. Also tested with taskiq 0.10.2.

The strange thing is that this does not happen if I create a fresh project. It also doesn't happen if I remove fastapi as a dependency from our project.

I tried with this basic example and no tasks:

from taskiq_redis import ListQueueClusterBroker, RedisAsyncResultBackend

redis_result_backend = RedisAsyncResultBackend(
    redis_url="redis://localhost:6379",
)

broker = ListQueueClusterBroker(
    url="redis://localhost:6379",

).with_result_backend(redis_result_backend)

poetry run taskiq worker backend.task_manager.broker:broker

I don't know exactly how to reproduce (couldn't recreate in a fresh project), but maybe you have some idea or there is some other information I can provide.

Process worker-1:
Traceback (most recent call last):
  File ".../.pyenv/versions/3.8.15/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File ".../.pyenv/versions/3.8.15/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File ".../.venv/lib/python3.8/site-packages/taskiq/cli/worker/run.py", line 147, in start_listen
    loop.run_until_complete(receiver.listen())
  File "uvloop/loop.pyx", line 1517, in uvloop.loop.Loop.run_until_complete
  File ".../.venv/lib/python3.8/site-packages/taskiq/receiver/receiver.py", line 330, in listen
    gr.start_soon(self.runner, queue)
  File ".../.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File ".../.venv/lib/python3.8/site-packages/taskiq/receiver/receiver.py", line 348, in prefetcher
    await self.sem_prefetch.acquire()
  File ".../.pyenv/versions/3.8.15/lib/python3.8/asyncio/locks.py", line 496, in acquire
    await fut
RuntimeError: Task <Task pending name='taskiq.receiver.receiver.Receiver.prefetcher' coro=<Receiver.prefetcher() running at .../.venv/lib/python3.8/site-packages/taskiq/receiver/receiver.py:348> cb=[TaskGroup._spawn.<locals>.task_done() at .../.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:661, _wait.<locals>._on_completion() at .../.pyenv/versions/3.8.15/lib/python3.8/asyncio/tasks.py:518]> got Future <Future pending> attached to a different loop
s3rius commented 10 months ago

I will publish a new version today with the fix. As a temporary solution you can uninstall uvloop.

noctuid commented 7 months ago

Any update on this?

s3rius commented 7 months ago

Initially I thought it was a problem of uvloop, but as I found out it wasn't related. Could not reproduce it locally either, so I cannot help on that issue. Really sorry for that.

But without MRE I cannot really debug it. The repo is open to all PRs if you find why is it happening.

NicolasFerec commented 2 days ago

Hi, did you ever figure it out? I'm having the same issue it seems, also using fastapi. For me it is happening on the test run only, with the InMemoryBroker.