taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

How to kiq with dependencies (and mypy)? #288

Closed joshvote closed 4 months ago

joshvote commented 4 months ago

We are using the async_shared_broker with a few custom TaskiqDepends that inject database / other dependencies. How are we supposed to kiq these tasks? The kiq function still has the dependency values expected and it's raising errors with mypy (no overload variant of kiq has arguments x/y/z) .

Given a toy example below with a task that has a "real" parameter and a "dependecy" parameter:

def custom_dep() -> str:
    return "My custom dependency value"

@async_shared_broker.task()
async def my_task(real_val: int, dep_val: Annotated[str, TaskiqDepends(custom_dep)]) -> None:
    print(f"my_val: {dep_val}")

How are we supposed to kiq this task?

await my_task.kicker().with_broker(broker).kiq(real_val=123)

Causes mypy to raise No overload variant of "kiq" of "AsyncKicker" matches argument types "int" mypy[call-overload](https://mypy.readthedocs.io/en/stable/_refs.html#code-call-overload)

The docs are a little unclear on how to proceed here (there isn't an example of this).

s3rius commented 4 months ago

The most obvious answer would be to not use Annotated style. This happens because annotated doesn't mean not-required, therefore these arguments are required.


def custom_dep() -> str:
    return "My custom dependency value"

@async_shared_broker.task()
async def my_task(real_val: int, dep_val: str = TaskiqDepends(custom_dep)) -> None:
    print(f"my_val: {dep_val}")

Mostly annotated-style dependencies were added for compatibility with fastapi and other DI-frameworks.

If you still want to use annotated for some reason, you can try adding = TaskiqDepends() like this:

@async_shared_broker.task()
async def my_task(real_val: int, dep_val: Annotated[str, TaskiqDepends(custom_dep)] = TaskiqDepends()) -> None:
    print(f"my_val: {dep_val}")

It should do the trick.

joshvote commented 4 months ago

This works fine and the jobs appear to execute OK with the InMemoryBroker

Thanks so much for the assist!