taskiq-python / taskiq

Distributed task queue with full async support
MIT License
872 stars 53 forks source link

Calling requque from a task with async generator dependency treats as dependency error #345

Closed TheArcherST closed 3 months ago

TheArcherST commented 3 months ago

Python version: Python 3.12.2 Taskiq version: 0.11.6

Reproducer


import asyncio
from typing import Annotated

from taskiq import (
    Context,
    TaskiqDepends,
    InMemoryBroker,
)

broker = InMemoryBroker()

async def stub_dependency():
    yield None

@broker.task
async def some_task(
        context: Annotated[Context, TaskiqDepends()],
        _: Annotated[None, TaskiqDepends(stub_dependency)],
):
    await asyncio.sleep(1)
    await context.requeue()

async def main():
    task = await some_task.kiq()
    await task.wait_result(timeout=10)

if __name__ == '__main__':
    asyncio.run(main())

It was expected that the dependency would not change the behavior of the task, but it does.

And, please, add NoReturn typehint to requeue method. Thank you for the project!

TheArcherST commented 3 months ago

I've figured out that this is more design question than a bug. There is a way to disable error propagation through dependencies. And Receiver.task_run method's code explicitly defines that error propagation treats requeue in the same way as other exceptions. This code works properly for me if pass kwarg propagate_exceptions=False to InMemoryBroker.